| Overall Statistics |
|
Total Orders 130 Average Win 1.16% Average Loss -1.12% Compounding Annual Return 35.719% Drawdown 13.800% Expectancy 0.221 Start Equity 100000 End Equity 116445.5 Net Profit 16.446% Sharpe Ratio 1.31 Sortino Ratio 1.034 Probabilistic Sharpe Ratio 66.337% Loss Rate 40% Win Rate 60% Profit-Loss Ratio 1.03 Alpha 0.181 Beta 0.256 Annual Standard Deviation 0.149 Annual Variance 0.022 Information Ratio 0.675 Tracking Error 0.209 Treynor Ratio 0.76 Total Fees $279.50 Estimated Strategy Capacity $91000000.00 Lowest Capacity Asset ES YTG30NVEFCW1 Portfolio Turnover 203.39% Drawdown Recovery 66 |
from AlgorithmImports import *
import xgboost as xgb
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import pandas as pd
import numpy as np
from datetime import timedelta
class WalkForwardMLIntraday(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2025, 1, 1) # Adjust start date as needed
self.SetEndDate(2025, 7, 1) # Adjust end date; current date is July 20, 2025
self.SetCash(100000)
# Add ES futures with minute resolution
self.future = self.AddFuture(Futures.Indices.SP500EMini, Resolution.Minute, extendedMarketHours=True, fillForward=True)
self.future.SetFilter(timedelta(0), timedelta(180)) # Front-month and near
# Set algorithm time zone to match futures exchange (US/Central for ES)
self.SetTimeZone(self.future.Exchange.TimeZone)
# Parameters
self.symbol = self.future.Symbol
self.lookback_days = 30 # Training window: past 30 trading days
self.retrain_frequency = 5 # Retrain every 5 trading days
self.model_type = "XGBoost" # Or "RandomForest" - switch here to test both
self.threshold = 0.0002 # Label threshold (0.02%)
# State variables
self.trade_count = 0
self.model = None
self.last_retrain_date = None
self.daily_data = {} # Cache for daily minute bars
self.prediction = 0 # 0: flat, 1: long
self.position_opened = False
self.contract_symbol = None # Current front-month contract symbol
# Schedule events in exchange time zone (US/Central)
# 09:59 ET = 08:59 CT, 10:00 ET = 09:00 CT, 12:00 ET = 11:00 CT
self.Schedule.On(self.DateRules.EveryDay(self.symbol), self.TimeRules.At(8, 59), self.GeneratePrediction)
self.Schedule.On(self.DateRules.EveryDay(self.symbol), self.TimeRules.At(9, 0), self.EnterTrade)
self.Schedule.On(self.DateRules.EveryDay(self.symbol), self.TimeRules.At(11, 0), self.ExitTrade)
# Warm-up with history
self.SetWarmUp(timedelta(days=60)) # Enough for initial training
def OnData(self, slice):
if self.IsWarmingUp:
return
# Cache minute data for the day
current_date = self.Time.date()
if current_date not in self.daily_data:
self.daily_data[current_date] = []
if self.symbol in slice.Bars:
bar = slice.Bars[self.symbol]
self.daily_data[current_date].append({
'time': self.Time,
'open': bar.Open,
'high': bar.High,
'low': bar.Low,
'close': bar.Close,
'volume': bar.Volume
})
def GeneratePrediction(self):
# Check if we need to retrain
current_date = self.Time.date()
if self.last_retrain_date is None or (current_date - self.last_retrain_date).days >= self.retrain_frequency:
self.TrainModel()
self.last_retrain_date = current_date
# Build features for today (using 09:00-09:59 ET = 08:00-08:59 CT)
df_today = self.GetTodayDataFrame()
if df_today is None or len(df_today) < 58: # Relaxed based on observed data
self.prediction = 0
self.Debug(f"Insufficient data for {current_date}: len={len(df_today) if df_today is not None else 'None'}")
return
features = self.BuildFeatures(df_today)
if features is None:
self.prediction = 0
self.Debug(f"Features None for {current_date}")
return
# Predict
if self.model:
pred = self.model.predict(np.array([features]))[0]
self.prediction = pred
self.Debug(f"Prediction for {current_date}: {pred} (1=Long, 0=Flat)")
else:
self.prediction = 0
# If positive prediction, get the front-month contract
if self.prediction == 1:
contracts = self.FutureChainProvider.GetFutureContractList(self.symbol, self.Time)
if contracts:
# Select front-month (shortest expiry using symbol.ID.Date)
front_contract = sorted(contracts, key=lambda c: c.ID.Date)[0]
self.contract_symbol = front_contract
else:
self.prediction = 0 # Cannot trade without contract
self.Debug(f"No futures contracts available for {current_date}")
def EnterTrade(self):
if self.prediction == 1 and not self.position_opened and self.contract_symbol:
self.MarketOrder(self.contract_symbol, 1) # Buy 1 contract of front-month
self.position_opened = True
self.trade_count += 1
self.Debug(f"Entered long at 10:00 ET (09:00 CT) on {self.Time.date()} with {self.contract_symbol}")
def ExitTrade(self):
if self.position_opened and self.contract_symbol:
self.Liquidate(self.contract_symbol)
self.position_opened = False
self.contract_symbol = None # Reset for next day
self.Debug(f"Exited at 12:00 ET (11:00 CT) on {self.Time.date()}")
def TrainModel(self):
# Gather historical data for training (past lookback_days)
df_history = self.GetHistoricalData()
if df_history is None:
return
# Build features and labels
features_list = []
labels_list = []
for date, df_day in df_history.groupby(df_history.index.date):
feats = self.BuildFeatures(df_day)
label = self.BuildLabel(df_day)
if feats is not None and label is not None:
features_list.append(feats)
labels_list.append(label)
if len(features_list) < 10: # Minimum samples
self.Debug("Insufficient data for training")
return
X = np.array(features_list)
y = np.array(labels_list)
# Simple split for validation (or use TimeSeriesSplit in full impl)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
if self.model_type == "XGBoost":
self.model = xgb.XGBClassifier(n_estimators=50, max_depth=3, learning_rate=0.05, random_state=42)
elif self.model_type == "RandomForest":
self.model = RandomForestClassifier(n_estimators=50, max_depth=3, random_state=42)
else:
raise ValueError("Invalid model_type")
self.model.fit(X_train, y_train)
# Quick eval
preds = self.model.predict(X_test)
acc = accuracy_score(y_test, preds)
self.Debug(f"Model trained on {len(X_train)} samples. Test accuracy: {acc:.2f}")
def GetHistoricalData(self):
# Fetch history up to yesterday
end = self.Time - timedelta(days=1)
start = end - timedelta(days=self.lookback_days)
history = self.History(self.symbol, start, end, Resolution.Minute)
if history.empty:
return None
df = history.reset_index()
df = df.rename(columns={'time': 'datetime', 'open': 'Open', 'high': 'High', 'low': 'Low', 'close': 'Close', 'volume': 'Volume'})
df.set_index('datetime', inplace=True)
return df
def GetTodayDataFrame(self):
current_date = self.Time.date()
if current_date not in self.daily_data or len(self.daily_data[current_date]) < 58:
return None
df = pd.DataFrame(self.daily_data[current_date])
df['datetime'] = df['time']
df = df.rename(columns={'open': 'Open', 'high': 'High', 'low': 'Low', 'close': 'Close', 'volume': 'Volume'})
df.set_index('datetime', inplace=True)
df = df.between_time("08:01", "08:59") # 09:01-09:59 ET = 08:01-08:59 CT
return df
def BuildFeatures(self, df):
# Refined features from our QuantBook (simplified for boilerplate)
if len(df) < 58:
return None
close = df['Close']
volume = df['Volume']
# Price features
ret_5m = close.pct_change(5).iloc[-1]
ret_10m = close.pct_change(10).iloc[-1]
open_close_range = (close.iloc[-1] - close.iloc[0]) / close.iloc[0]
# VWAP features with zero volume handling
total_vol = volume.sum()
if total_vol > 0:
vwap = (close * volume).sum() / total_vol
else:
vwap = close.iloc[-1]
price_above_vwap = close.iloc[-1] / vwap if vwap != 0 else 1.0
# New features (simplified) with zero volume handling
vol_late = volume[-10:].sum()
if vol_late > 0:
vwap_late = (close[-10:] * volume[-10:]).sum() / vol_late
else:
vwap_late = close[-10:].iloc[-1]
vol_early = volume[:10].sum()
if vol_early > 0:
vwap_early = (close[:10] * volume[:10]).sum() / vol_early
else:
vwap_early = close[:10].iloc[-1]
vwap_slope = vwap_late - vwap_early
early_range = df.between_time("08:01", "08:30")['High'].max() - df.between_time("08:01", "08:30")['Low'].min() # 09:01-09:30 ET = 08:01-08:30 CT
volume_imbalance = volume[-5:].sum() / max(1, volume[:5].sum())
features = [ret_5m, ret_10m, open_close_range, vwap, price_above_vwap, vwap_slope, early_range, volume_imbalance]
if any(np.isnan(f) for f in features):
return None
return features
def BuildLabel(self, df):
# Label based on 10:00-12:00 ET return = 09:00-11:00 CT
entry_df = df.between_time("09:01", "09:01")
entry = entry_df['Open'].iloc[0] if not entry_df.empty else None
# Close at 11:00 CT
exit_df = df.between_time("11:00", "11:00")
exit_price = exit_df['Close'].iloc[0] if not exit_df.empty else None
if entry is None or exit_price is None:
return None
ret = (exit_price / entry) - 1
return 1 if ret > self.threshold else 0
def OnEndOfAlgorithm(self):
self.Debug(f"Total trades: {self.trade_count}")