| Overall Statistics |
|
Total Orders 258 Average Win 0.40% Average Loss -0.32% Compounding Annual Return 14.029% Drawdown 9.000% Expectancy 0.441 Start Equity 100000 End Equity 118233.66 Net Profit 18.234% Sharpe Ratio 0.861 Sortino Ratio 0.961 Probabilistic Sharpe Ratio 44.067% Loss Rate 36% Win Rate 64% Profit-Loss Ratio 1.26 Alpha 0 Beta 0 Annual Standard Deviation 0.108 Annual Variance 0.012 Information Ratio 0.936 Tracking Error 0.108 Treynor Ratio 0 Total Fees $237.38 Estimated Strategy Capacity $2700000000.00 Lowest Capacity Asset SPY R735QTJ8XC9X Portfolio Turnover 14.32% Drawdown Recovery 120 |
# region imports
from AlgorithmImports import *
import tensorflow as tf
from tensorflow import keras
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from datetime import timedelta
# endregion
class TensorFlowAlgorithm(QCAlgorithm):
"""
Enhanced ML Trading Algorithm with 30 Engineered Features + LEVERAGE
PHASE 1 ENHANCEMENTS (Leverage & Aggressive Sizing):
- Base leverage: 1.75x (conservative start)
- Max leverage: 2.5x (cap for safety)
- Confidence multiplier: 75 (vs 50 baseline)
- Tiered position sizing based on prediction confidence:
* Very High (>1.5%): 1.5x multiplier
* High (>1.0%): 1.25x multiplier
* Medium (>0.5%): 1.0x multiplier
* Low (<0.5%): 0.75x multiplier
Based on backtest results showing:
- 58% win rate (vs 36% baseline)
- 1.45 profit-loss ratio
- 0.59 Sharpe ratio (vs 0.372 baseline)
Original research: 52.61% directional accuracy with 30 features
Improvements over baseline:
- 30 features (vs 5 simple price diffs)
- Advanced architecture with BatchNormalization
- Cross-asset context (QQQ, IWM, TLT, GLD)
- Proper feature scaling
- LEVERAGE to amplify edge (NEW)
"""
def initialize(self) -> None:
self.set_start_date(2021, 6, 22) # Set Start Date
self.set_end_date(2022, 10, 1)
self.set_cash(100000) # Set Strategy Cash
# Enable leverage for this strategy
self.set_brokerage_model(BrokerageName.INTERACTIVE_BROKERS_BROKERAGE, AccountType.MARGIN)
# Add primary trading symbol
self.spy = self.add_equity("SPY", Resolution.DAILY).symbol
# Add cross-asset context symbols (for feature calculation)
self.context_symbols = {
'QQQ': self.add_equity("QQQ", Resolution.DAILY).symbol,
'IWM': self.add_equity("IWM", Resolution.DAILY).symbol,
'TLT': self.add_equity("TLT", Resolution.DAILY).symbol,
'GLD': self.add_equity("GLD", Resolution.DAILY).symbol
}
# Top 30 features (from research consensus ranking)
# NOTE: Removed price_position_252d due to excessive data requirements
# Using shorter alternatives to ensure we have valid data
self.selected_features = [
'price_to_ma_200', 'realized_vol_5d', 'rel_return_GLD', 'return_1d',
'price_ratio_TLT', 'return_60d', 'rel_return_TLT', 'gap',
'log_return_1d', 'price_to_ma_50', 'price_ratio_GLD', 'ma_50_200_ratio',
'gld_spy_ratio_z', 'volume_trend_20d', 'price_to_ma_10', 'realized_vol_20d',
'realized_vol_10d', 'return_2d', 'realized_vol_60d', 'return_acceleration_5d',
'bb_width', 'price_to_ma_20', 'price_position_60d', # Changed from 252d to 60d
'vol_QQQ', 'atr_28', 'hl_range', 'rsi_28', 'macd_histogram',
'atr_14', 'parkinson_vol_60d'
]
# Model hyperparameters (from research optimization)
self.num_features = 30
self.epochs = 300
self.learning_rate = 0.0005
self.batch_size = 32
# Create advanced MLP model with BatchNormalization
self.model = tf.keras.Sequential([
tf.keras.layers.Dense(128, kernel_regularizer=tf.keras.regularizers.l2(0.001), input_shape=(self.num_features,)),
tf.keras.layers.BatchNormalization(),
tf.keras.layers.LeakyReLU(negative_slope=0.01),
tf.keras.layers.Dropout(0.4),
tf.keras.layers.Dense(64, kernel_regularizer=tf.keras.regularizers.l2(0.001)),
tf.keras.layers.BatchNormalization(),
tf.keras.layers.LeakyReLU(negative_slope=0.01),
tf.keras.layers.Dropout(0.4),
tf.keras.layers.Dense(32, kernel_regularizer=tf.keras.regularizers.l2(0.001)),
tf.keras.layers.BatchNormalization(),
tf.keras.layers.LeakyReLU(negative_slope=0.01),
tf.keras.layers.Dropout(0.3),
tf.keras.layers.Dense(16, kernel_regularizer=tf.keras.regularizers.l2(0.001)),
tf.keras.layers.BatchNormalization(),
tf.keras.layers.LeakyReLU(negative_slope=0.01),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(1, activation='linear')
])
# Compile model
self.model.compile(
optimizer=tf.keras.optimizers.Adam(learning_rate=self.learning_rate),
loss='mse',
metrics=['mae']
)
# Feature scaler (fitted on training data)
self.scaler = StandardScaler()
self.scaler_fitted = False
# ================================================================
# LEVERAGE & POSITION SIZING PARAMETERS (Phase 1 Enhancement)
# ================================================================
# Base leverage multiplier - with 58% win rate, we can be more aggressive
self.base_leverage = 1.75 # 1.75x leverage (conservative start)
self.max_leverage = 2.5 # Cap maximum exposure at 2.5x
# Aggressive confidence multiplier (increased from 50 to 75)
# This means a prediction of 0.02 (2%) -> 1.5x position (vs 1.0x before)
self.confidence_multiplier = 75
# Confidence thresholds for tiered position sizing
self.confidence_tiers = {
'very_high': 0.015, # >1.5% prediction
'high': 0.010, # >1.0% prediction
'medium': 0.005, # >0.5% prediction
'low': 0.0 # <0.5% prediction
}
# Tier multipliers (applied on top of base sizing)
self.tier_multipliers = {
'very_high': 1.5, # Boost very confident signals by 50%
'high': 1.25, # Boost high confidence by 25%
'medium': 1.0, # Normal sizing
'low': 0.75 # Reduce low confidence by 25%
}
self.debug(f"✓ Leverage settings: base={self.base_leverage}x, max={self.max_leverage}x, confidence_mult={self.confidence_multiplier}")
# ================================================================
# Rolling window for training data
# With 200-day MA as longest feature, need ~250+ days minimum
# Using 500 days gives us ~250 usable rows after feature calculation
self.training_length = 500 # ~2 years of data
self.min_training_length = 300 # Minimum before we start trading
# Store OHLCV history for all symbols
self.price_data = {}
# Warm up with historical data
# Need enough warmup to cover training_length
self.warmup_period = self.training_length + 50 # Extra buffer
self.set_warm_up(timedelta(days=self.warmup_period))
# Train immediately after warmup
self.train(self.my_training_method)
# Retrain weekly (every Monday at market open)
self.train(self.date_rules.week_start(), self.time_rules.at(9, 31), self.my_training_method)
self.debug(f"✓ Initialized with {self.num_features} features and advanced architecture")
def calculate_features(self, history_df):
"""
Calculate all 30 selected features from OHLCV history
Args:
history_df: Dict with keys 'SPY', 'QQQ', 'IWM', 'TLT', 'GLD'
Each value is a DataFrame with columns: open, high, low, close, volume
Returns:
DataFrame with 30 features, or None if insufficient data
"""
try:
self.debug(f"[calculate_features] Starting feature calculation...")
self.debug(f"[calculate_features] Available symbols: {list(history_df.keys())}")
# Extract SPY data
spy_df = history_df['SPY']
self.debug(f"[calculate_features] SPY DataFrame shape: {spy_df.shape}")
self.debug(f"[calculate_features] SPY columns: {list(spy_df.columns)}")
spy_close = spy_df['close']
spy_open = spy_df['open']
spy_high = spy_df['high']
spy_low = spy_df['low']
spy_volume = spy_df['volume']
self.debug(f"[calculate_features] SPY close: {len(spy_close)} values, first={spy_close.iloc[0] if len(spy_close) > 0 else 'N/A'}")
# Initialize features dictionary
features = pd.DataFrame(index=spy_close.index)
self.debug(f"[calculate_features] Initialized features DataFrame with {len(features)} rows")
# ================================================================
# RETURNS & MOMENTUM (7 features in top 30)
# ================================================================
features['return_1d'] = spy_close.pct_change(1)
features['return_2d'] = spy_close.pct_change(2)
features['return_60d'] = spy_close.pct_change(60)
features['log_return_1d'] = np.log(spy_close / spy_close.shift(1))
return_momentum_5d = features['return_1d'].rolling(5).mean()
features['return_acceleration_5d'] = return_momentum_5d.diff()
# ================================================================
# TECHNICAL INDICATORS (8 features in top 30)
# ================================================================
# Moving averages
ma_10 = spy_close.rolling(10).mean()
ma_20 = spy_close.rolling(20).mean()
ma_50 = spy_close.rolling(50).mean()
ma_200 = spy_close.rolling(200).mean()
features['price_to_ma_10'] = spy_close / ma_10 - 1
features['price_to_ma_20'] = spy_close / ma_20 - 1
features['price_to_ma_50'] = spy_close / ma_50 - 1
features['price_to_ma_200'] = spy_close / ma_200 - 1
features['ma_50_200_ratio'] = ma_50 / ma_200 - 1
# Price position in range - use 60-day window instead of 252-day
rolling_min_60 = spy_close.rolling(60).min()
rolling_max_60 = spy_close.rolling(60).max()
features['price_position_60d'] = (spy_close - rolling_min_60) / (rolling_max_60 - rolling_min_60 + 1e-10)
# Also keep 252d version for backward compatibility but won't use it
rolling_min = spy_close.rolling(252).min()
rolling_max = spy_close.rolling(252).max()
features['price_position_252d'] = (spy_close - rolling_min) / (rolling_max - rolling_min + 1e-10)
# RSI
def calculate_rsi(prices, period=28):
delta = prices.diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / (loss + 1e-10)
return 100 - (100 / (1 + rs))
features['rsi_28'] = calculate_rsi(spy_close, 28)
# MACD
ema_12 = spy_close.ewm(span=12, adjust=False).mean()
ema_26 = spy_close.ewm(span=26, adjust=False).mean()
macd = ema_12 - ema_26
macd_signal = macd.ewm(span=9, adjust=False).mean()
features['macd_histogram'] = macd - macd_signal
# ================================================================
# VOLATILITY (9 features in top 30)
# ================================================================
returns = spy_close.pct_change()
features['realized_vol_5d'] = returns.rolling(5).std() * np.sqrt(252)
features['realized_vol_10d'] = returns.rolling(10).std() * np.sqrt(252)
features['realized_vol_20d'] = returns.rolling(20).std() * np.sqrt(252)
features['realized_vol_60d'] = returns.rolling(60).std() * np.sqrt(252)
# Parkinson volatility
def parkinson_volatility(high, low, window=60):
hl_ratio = np.log(high / (low + 1e-10))
return hl_ratio.rolling(window).apply(
lambda x: np.sqrt(np.mean(x**2) / (4 * np.log(2))) * np.sqrt(252)
)
features['parkinson_vol_60d'] = parkinson_volatility(spy_high, spy_low, 60)
# ATR
def calculate_atr(high, low, close, period=14):
high_low = high - low
high_close = np.abs(high - close.shift())
low_close = np.abs(low - close.shift())
true_range = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1)
return true_range.rolling(period).mean()
features['atr_14'] = calculate_atr(spy_high, spy_low, spy_close, 14)
features['atr_28'] = calculate_atr(spy_high, spy_low, spy_close, 28)
# Bollinger Bands
sma_20 = spy_close.rolling(20).mean()
rolling_std = spy_close.rolling(20).std()
bb_upper = sma_20 + (rolling_std * 2)
bb_lower = sma_20 - (rolling_std * 2)
features['bb_width'] = (bb_upper - bb_lower) / (sma_20 + 1e-10)
# ================================================================
# VOLUME (1 feature in top 30)
# ================================================================
features['volume_trend_20d'] = spy_volume.rolling(20).apply(
lambda x: np.polyfit(np.arange(len(x)), x, 1)[0] if len(x) > 1 else 0
)
# ================================================================
# MICROSTRUCTURE (2 features in top 30)
# ================================================================
features['hl_range'] = (spy_high - spy_low) / (spy_close + 1e-10)
features['gap'] = (spy_open - spy_close.shift(1)) / (spy_close.shift(1) + 1e-10)
# ================================================================
# CROSS-ASSET (5 features in top 30)
# ================================================================
# GLD features
if 'GLD' in history_df:
gld_close = history_df['GLD']['close']
gld_return = gld_close.pct_change()
features['rel_return_GLD'] = spy_close.pct_change() - gld_return
features['price_ratio_GLD'] = (spy_close / gld_close) / (spy_close / gld_close).rolling(20).mean() - 1
gld_spy_ratio = gld_close / spy_close
gld_spy_ratio_ma = gld_spy_ratio.rolling(60).mean()
gld_spy_ratio_std = gld_spy_ratio.rolling(60).std()
features['gld_spy_ratio_z'] = (gld_spy_ratio - gld_spy_ratio_ma) / (gld_spy_ratio_std + 1e-10)
# TLT features
if 'TLT' in history_df:
tlt_close = history_df['TLT']['close']
tlt_return = tlt_close.pct_change()
features['rel_return_TLT'] = spy_close.pct_change() - tlt_return
features['price_ratio_TLT'] = (spy_close / tlt_close) / (spy_close / tlt_close).rolling(20).mean() - 1
# QQQ volatility
if 'QQQ' in history_df:
qqq_close = history_df['QQQ']['close']
features['vol_QQQ'] = qqq_close.pct_change().rolling(20).std() * np.sqrt(252)
# Select only the 30 features we need (in the correct order)
self.debug(f"[calculate_features] Selecting {len(self.selected_features)} features from {len(features.columns)} calculated")
# Check which features exist
missing_features = [f for f in self.selected_features if f not in features.columns]
if missing_features:
self.debug(f"[calculate_features] WARNING: Missing features: {missing_features[:5]}...")
features_subset = features[self.selected_features].copy()
self.debug(f"[calculate_features] Features subset shape before dropna: {features_subset.shape}")
# Check NaN count per feature
nan_counts = features_subset.isna().sum()
features_with_nans = nan_counts[nan_counts > 0].sort_values(ascending=False)
if len(features_with_nans) > 0:
self.debug(f"[calculate_features] Features with NaN (top 5): {dict(list(features_with_nans.head(5).items()))}")
# Drop rows with NaN (due to rolling windows)
features_subset = features_subset.dropna()
self.debug(f"[calculate_features] Features subset shape after dropna: {features_subset.shape}")
self.debug(f"[calculate_features] Feature calculation SUCCESS - returning {len(features_subset)} valid rows")
return features_subset
except Exception as e:
self.debug(f"[calculate_features] ERROR: {str(e)}")
import traceback
self.debug(f"[calculate_features] Traceback: {traceback.format_exc()}")
return None
def get_history_dataframes(self):
"""
Get historical OHLCV data for all symbols as DataFrames
Returns:
Dict with symbol keys and DataFrame values
"""
history_dict = {}
self.debug(f"[get_history_dataframes] Requesting {self.training_length} bars of history...")
# Get SPY history
spy_history = self.history(self.spy, self.training_length, Resolution.DAILY)
self.debug(f"[get_history_dataframes] SPY history: {len(spy_history)} bars, empty={spy_history.empty}")
if not spy_history.empty:
# Reset index to use integer positions instead of timestamps
history_dict['SPY'] = pd.DataFrame({
'open': spy_history['open'].values,
'high': spy_history['high'].values,
'low': spy_history['low'].values,
'close': spy_history['close'].values,
'volume': spy_history['volume'].values
})
self.debug(f"[get_history_dataframes] SPY DataFrame created: shape={history_dict['SPY'].shape}")
else:
self.debug(f"[get_history_dataframes] ERROR: SPY history is empty!")
# Get context symbols history
for name, symbol in self.context_symbols.items():
context_history = self.history(symbol, self.training_length, Resolution.DAILY)
self.debug(f"[get_history_dataframes] {name} history: {len(context_history)} bars, empty={context_history.empty}")
if not context_history.empty:
# Reset index to match SPY - use integer positions
history_dict[name] = pd.DataFrame({
'open': context_history['open'].values,
'high': context_history['high'].values,
'low': context_history['low'].values,
'close': context_history['close'].values,
'volume': context_history['volume'].values
})
self.debug(f"[get_history_dataframes] {name} DataFrame created: shape={history_dict[name].shape}")
else:
self.debug(f"[get_history_dataframes] WARNING: {name} history is empty - skipping")
self.debug(f"[get_history_dataframes] Total symbols loaded: {list(history_dict.keys())}")
return history_dict
def my_training_method(self) -> None:
"""
Train the model with advanced architecture and proper feature engineering
"""
if self.is_warming_up:
self.debug("[my_training_method] Still warming up, skipping training")
return
self.debug("[my_training_method] Starting training cycle...")
try:
# Get historical data for all symbols
history_dict = self.get_history_dataframes()
if 'SPY' not in history_dict:
self.debug(f"[my_training_method] ERROR: SPY not in history_dict. Keys: {list(history_dict.keys())}")
return
spy_len = len(history_dict['SPY'])
self.debug(f"[my_training_method] SPY data length: {spy_len}, min required: {self.min_training_length}")
if spy_len < self.min_training_length:
self.debug(f"[my_training_method] Insufficient data for training: {spy_len} bars (need {self.min_training_length})")
return
# Calculate features
self.debug(f"[my_training_method] Calculating features from {spy_len} bars...")
features_df = self.calculate_features(history_dict)
if features_df is None:
self.debug(f"[my_training_method] ERROR: calculate_features returned None")
return
features_len = len(features_df)
self.debug(f"[my_training_method] Features calculated: {features_len} rows x {features_df.shape[1] if hasattr(features_df, 'shape') else '?'} cols")
if features_len < 100:
self.debug(f"[my_training_method] Insufficient features calculated: {features_len} (need 100+)")
return
# Create target: next-day return
spy_close = history_dict['SPY']['close']
# Align target with features index
target = spy_close.pct_change().shift(-1)
target = target.loc[features_df.index]
# Remove last row (no target for most recent day)
features_df = features_df.iloc[:-1]
target = target.iloc[:-1]
# Drop any remaining NaN
valid_idx = ~target.isna()
X = features_df[valid_idx].values
y = target[valid_idx].values
if len(X) < 100:
self.debug(f" Insufficient valid samples: {len(X)}")
return
# Fit scaler on training data (only once, or refit periodically)
if not self.scaler_fitted:
self.scaler.fit(X)
self.scaler_fitted = True
# Scale features
X_scaled = self.scaler.transform(X)
# Train model with early stopping
early_stop = tf.keras.callbacks.EarlyStopping(
monitor='loss',
patience=30,
restore_best_weights=True
)
reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(
monitor='loss',
factor=0.5,
patience=15,
min_lr=1e-6
)
# Train
self.model.fit(
X_scaled, y,
epochs=self.epochs,
batch_size=self.batch_size,
verbose=0,
callbacks=[early_stop, reduce_lr],
validation_split=0.2
)
self.debug(f" Model trained on {len(X)} samples with {self.num_features} features")
except Exception as e:
self.debug(f"Training error: {str(e)}")
import traceback
self.debug(f"Traceback: {traceback.format_exc()}")
def on_data(self, data: Slice) -> None:
"""
Make predictions and execute trades based on model output
"""
if self.is_warming_up:
return
if not self.scaler_fitted:
self.debug(f"[on_data] Scaler not fitted yet, skipping trade")
return
try:
# Get recent history for feature calculation
history_dict = self.get_history_dataframes()
if 'SPY' not in history_dict:
self.debug(f"[on_data] SPY not in history_dict")
return
self.debug(f"[on_data] Calculating features for prediction...")
# Calculate features
features_df = self.calculate_features(history_dict)
if features_df is None or len(features_df) == 0:
self.debug(f"[on_data] No features calculated - features_df is None or empty")
return
self.debug(f"[on_data] Got {len(features_df)} feature rows, using most recent")
# Get most recent feature row
latest_features = features_df.iloc[-1:].values
# Scale features
latest_features_scaled = self.scaler.transform(latest_features)
# Make prediction
prediction = self.model.predict(latest_features_scaled, verbose=0)[0][0]
self.debug(f"[on_data] Prediction: {prediction:.6f}")
# ================================================================
# ENHANCED POSITION SIZING WITH LEVERAGE (Phase 1)
# ================================================================
# Get prediction confidence (absolute value)
confidence = abs(prediction)
# Determine confidence tier
if confidence >= self.confidence_tiers['very_high']:
tier = 'very_high'
elif confidence >= self.confidence_tiers['high']:
tier = 'high'
elif confidence >= self.confidence_tiers['medium']:
tier = 'medium'
else:
tier = 'low'
# Calculate base position size (more aggressive with multiplier = 75)
base_size = confidence * self.confidence_multiplier
# Apply tier multiplier
tier_adjusted_size = base_size * self.tier_multipliers[tier]
# Apply base leverage
leveraged_size = tier_adjusted_size * self.base_leverage
# Cap at maximum leverage
position_size = min(self.max_leverage, leveraged_size)
# Ensure minimum position size for valid signals
if position_size < 0.1: # Less than 10% - skip trade
self.debug(f"[on_data] Position too small ({position_size:.2%}), skipping")
return
# Trading logic: buy if predicting positive return, sell if negative
if prediction > 0:
# Bullish prediction - go long
self.debug(f"[on_data] BULLISH: confidence={confidence:.4f}, tier={tier}, size={position_size:.2%} ({position_size:.2f}x)")
self.set_holdings(self.spy, position_size)
else:
# Bearish prediction - go short
self.debug(f"[on_data] BEARISH: confidence={confidence:.4f}, tier={tier}, size=-{position_size:.2%} (-{position_size:.2f}x)")
self.set_holdings(self.spy, -position_size)
except Exception as e:
self.debug(f"[on_data] ERROR: {str(e)}")
import traceback
self.debug(f"[on_data] Traceback: {traceback.format_exc()}")