| Overall Statistics |
|
Total Orders 3208 Average Win 0.62% Average Loss -0.57% Compounding Annual Return -7.678% Drawdown 40.700% Expectancy -0.024 Start Equity 25000 End Equity 19112.1 Net Profit -23.552% Sharpe Ratio -0.581 Sortino Ratio -0.822 Probabilistic Sharpe Ratio 0.331% Loss Rate 53% Win Rate 47% Profit-Loss Ratio 1.08 Alpha -0.098 Beta 0.163 Annual Standard Deviation 0.153 Annual Variance 0.023 Information Ratio -0.739 Tracking Error 0.199 Treynor Ratio -0.543 Total Fees $2083.90 Estimated Strategy Capacity $49000.00 Lowest Capacity Asset QQQ 32SUX37D9NQSM|QQQ RIWIV7K5Z9LX Portfolio Turnover 4.66% |
# region imports
from AlgorithmImports import *
# endregion
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report
import xgboost as xgb
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.neural_network import MLPClassifier
from sklearn.cluster import KMeans
from sklearn.impute import SimpleImputer
class EnhancedQQQOptionsScalpingAlgorithm(QCAlgorithm):
def Initialize(self):
# Set start and end dates
self.SetStartDate(2022, 3, 1)
self.SetEndDate(2025, 7, 10)
self.SetCash(25000)
self.set_brokerage_model(BrokerageName.CHARLES_SCHWAB, AccountType.MARGIN)
# Add QQQ with minute resolution
self.symbol = self.AddEquity("QQQ", Resolution.Minute).Symbol
# Add options for QQQ
self.equity_options = self.AddOption("QQQ", Resolution.Minute)
self.equity_options.SetFilter(self.OptionFilter)
# Add SPY for market regime detection
self.spy_symbol = self.AddEquity("SPY", Resolution.Minute).Symbol
# Add VIX for volatility regime
self.vix_symbol = self.AddEquity("VIX", Resolution.Minute).Symbol
self.set_warm_up(60)
# Model parameters
self.lookback_periods = int(self.get_parameter("lookback_periods"))
self.prediction_horizon = int(self.get_parameter("prediction_horizon")) # in minutes ahead
self.buy_threshold = 0.001
self.sell_threshold = -0.001
# Options-specific parameters
self.max_dte = 7 # Maximum days to expiration
self.min_dte = 1 # Minimum days to expiration
self.max_otm_distance = 0.02 # Maximum out-of-the-money distance (2%)
self.min_volume = int(self.get_parameter("min_option_volume")) # Minimum option volume
self.min_open_interest = int(self.get_parameter("min_open_interest")) # Minimum open interest
self.quantity = 1 # number of options to trade
# Options-specific stop loss and overnight trading settings
self.options_stop_loss = -0.35 # 35% stop loss for options (more conservative than the -50% general stop loss)
self.allow_overnight_trades = True # Set to True to initially allow overnight swing trades, will change dynamically
self.never_overnight = False # Allow algo to dynamically set overnight swing settings based on confidence
self.intraday_only = False # Set to False to allow multi-day positions
self.eod_liquidation_time = 15.5 # 3:30 PM ET (15.5 hours in 24-hour format)
self.min_confidence_threshold = float(self.get_parameter("min_confidence_threshold")) # confidence threshold for opening a trade
# Initialize models for different regimes
self.models = {
'low_vol': self._initialize_model("random_forest"),
'high_vol': self._initialize_model("xgboost"),
'trending': self._initialize_model("gradient_boost"),
'mean_reverting': self._initialize_model("logistic")
}
self.scalers = {regime: StandardScaler() for regime in self.models.keys()}
self.feature_columns = []
# Regime detection
self.regime_detector = KMeans(n_clusters=4, random_state=42)
self.current_regime = 'low_vol'
self.regime_features = []
self.regime_history = []
# Seasonal adjustment mechanism
self.seasonal_multiplier = 1.0
self.seasonal_window = 15 # in days, to measure performance
self.seasonal_history = []
# Adaptive thresholds
self.adaptive_buy_threshold = self.buy_threshold
self.adaptive_sell_threshold = self.sell_threshold
# Data storage
self.price_data = []
self.market_data = []
self.options_data = []
self.model_performance = {}
self.regime_accuracies = {regime: 0.5 for regime in self.models.keys()}
# Options tracking
self.current_call_options = {}
self.current_put_options = {}
self.option_positions = {}
# Performance tracking
self.last_prediction = 0
self.prediction_confidence = 0.0
self.regime_confidence = 0.0
# Schedule model retraining
self.Schedule.On(
self.DateRules.EveryDay(self.symbol),
self.time_rules.before_market_close(self.symbol, 10),
self.TrainModels
)
# Main trading logic
self.Schedule.On(
self.DateRules.EveryDay(self.symbol),
self.TimeRules.Every(TimeSpan.FromMinutes(5)),
self.ExecuteOptionsTrading
)
# Regime detection and seasonal adjustment
self.Schedule.On(
self.DateRules.EveryDay(self.symbol),
self.TimeRules.Every(TimeSpan.FromMinutes(15)),
self.DetectRegimeAndSeason
)
# Options management
self.Schedule.On(
self.DateRules.EveryDay(self.symbol),
self.TimeRules.Every(TimeSpan.FromMinutes(1)),
self.ManageOptionsPositions
)
self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.At(15, 58), self.DailyEODProcessing)
# Risk management
#self.max_position_size = 0.06 # 6%
self.transaction_cost = 0.001
self.max_options_allocation = 0.2 # Maximum portfolio allocation to options
self.profit_target = float(self.get_parameter("profit_target")) # % profit target
self.stop_loss = -float(self.get_parameter("stop_loss")) # 50% stop loss
self.trained = False
self.debug("Enhanced QQQ Options Algorithm initialized with regime detection")
def OptionFilter(self, universe):
"""Filter options based on our criteria"""
return universe.IncludeWeeklys().Strikes(-10, 10).Expiration(TimeSpan.FromDays(1), TimeSpan.FromDays(self.max_dte))
def OnData(self, data):
if self.is_warming_up:
return
"""Store incoming data for all symbols including options"""
# Store QQQ data
if data.Bars.ContainsKey(self.symbol):
bar = data.Bars[self.symbol]
price_info = {
'timestamp': self.time,
'open': float(bar.open),
'high': float(bar.high),
'low': float(bar.low),
'close': float(bar.close),
'volume': float(bar.volume)
}
self.price_data.append(price_info)
# Store market data for regime detection
market_info = {'timestamp': self.Time}
if data.Bars.ContainsKey(self.spy_symbol):
spy_bar = data.Bars[self.spy_symbol]
market_info['spy_close'] = float(spy_bar.Close)
market_info['spy_volume'] = float(spy_bar.Volume)
if data.Bars.ContainsKey(self.vix_symbol):
vix_bar = data.Bars[self.vix_symbol]
market_info['vix_close'] = float(vix_bar.Close)
if len(market_info) > 1:
self.market_data.append(market_info)
# Process options data
if hasattr(data, 'OptionChains') and data.OptionChains:
for chain in data.OptionChains.Values:
self.ProcessOptionsChain(chain)
# Keep data manageable
if len(self.price_data) > 1000:
self.price_data = self.price_data[-1000:]
if len(self.market_data) > 1000:
self.market_data = self.market_data[-1000:]
if len(self.options_data) > 500:
self.options_data = self.options_data[-500:]
def ProcessOptionsChain(self, chain):
"""Process options chain data"""
underlying_price = chain.Underlying.Price
# Clear previous options
self.current_call_options.clear()
self.current_put_options.clear()
for contract in chain:
# Filter by our criteria
if (contract.Expiry - self.Time).days > self.max_dte:
continue
if (contract.Expiry - self.Time).days < self.min_dte:
continue
# Check volume and open interest
if contract.Volume < self.min_volume:
continue
if contract.OpenInterest < self.min_open_interest:
continue
# Check if reasonably close to ATM
strike_distance = abs(contract.Strike - underlying_price) / underlying_price
if strike_distance > self.max_otm_distance:
continue
# Store option data
option_info = {
'symbol': contract.Symbol,
'strike': contract.Strike,
'expiry': contract.Expiry,
'right': contract.Right,
'bid': contract.BidPrice,
'ask': contract.AskPrice,
'last': contract.LastPrice,
'volume': contract.Volume,
'open_interest': contract.OpenInterest,
'implied_volatility': contract.ImpliedVolatility,
'delta': contract.Greeks.Delta,
'gamma': contract.Greeks.Gamma,
'theta': contract.Greeks.Theta,
'vega': contract.Greeks.Vega,
'underlying_price': underlying_price,
'timestamp': self.Time
}
self.options_data.append(option_info)
# Store for quick access
if contract.Right == OptionRight.Call:
self.current_call_options[contract.Strike] = option_info
else:
self.current_put_options[contract.Strike] = option_info
def DetectRegimeAndSeason(self):
"""Detect current market regime and seasonal patterns"""
if self.is_warming_up:
return
if len(self.price_data) < 100:
self.debug(f"price_data is less than 100")
return
try:
# Create regime features
df = pd.DataFrame(self.price_data[-100:])
df = self.CreateEnhancedFeatures(df)
#self.debug(f"{df.head()}")
# Features for regime detection
regime_features = [
'volatility_20', 'rsi', 'bb_position', 'momentum_20',
'volume_ratio_20', 'vol_regime', 'trend_regime'
]
# Add options features if available
if 'avg_iv' in df.columns:
regime_features.append('avg_iv')
if 'call_put_ratio' in df.columns:
regime_features.append('call_put_ratio')
available_features = [f for f in regime_features if f in df.columns]
if len(available_features) > 3:
regime_data = df[available_features]
if len(regime_data) > 20:
# Fit regime detector
self.regime_detector.fit(regime_data)
# Get current regime
current_features = regime_data.iloc[-1:].values
regime_cluster = self.regime_detector.predict(current_features)[0]
#self.debug(f"regime cluster={regime_cluster}")
# Map cluster to regime name
regime_mapping = {0: 'low_vol', 1: 'high_vol', 2: 'trending', 3: 'mean_reverting'}
self.current_regime = regime_mapping.get(regime_cluster, 'low_vol')
# Calculate regime confidence
distances = self.regime_detector.transform(current_features)[0]
#self.debug(f"distances: {distances}")
self.regime_confidence = 1.0 / (1.0 + np.min(distances))
# Seasonal adjustment
self.CalculateSeasonalAdjustment()
self.AdjustOptionsParameters()
self.Debug(f"Current regime: {self.current_regime} (confidence: {self.regime_confidence:.3f})")
#self.Debug(f"Seasonal multiplier: {self.seasonal_multiplier:.3f}")
except Exception as e:
self.Debug(f"Regime detection failed: {str(e)}")
def CreateEnhancedFeatures(self, df):
"""Create enhanced features including regime and seasonal indicators"""
df = df.copy()
if len(df) < 60: # Needs to be at least 50+ rows for stable rolling
self.debug("Insufficient data for enhanced feature generation")
return pd.DataFrame()
# Original features
df = self.CreateFeatures(df)
# Seasonal features
df['month'] = df['timestamp'].dt.month
df['quarter'] = df['timestamp'].dt.quarter
df['day_of_week'] = df['timestamp'].dt.dayofweek
df['hour'] = df['timestamp'].dt.hour
# Seasonal cyclical encoding
df['month_sin'] = np.sin(2 * np.pi * df['month'] / 12)
df['month_cos'] = np.cos(2 * np.pi * df['month'] / 12)
df['hour_sin'] = np.sin(2 * np.pi * df['hour'] / 24)
df['hour_cos'] = np.cos(2 * np.pi * df['hour'] / 24)
# Quarter-specific features
df['is_q1'] = (df['quarter'] == 1).astype(int)
df['is_q2'] = (df['quarter'] == 2).astype(int)
df['is_q3'] = (df['quarter'] == 3).astype(int)
df['is_q4'] = (df['quarter'] == 4).astype(int)
# Market regime features
if len(self.market_data) > 50:
market_df = pd.DataFrame(self.market_data[-len(df):])
if 'vix_close' in market_df.columns:
df['vix_regime'] = (market_df['vix_close'] > market_df['vix_close'].rolling(20).mean()).astype(int)
if 'spy_close' in market_df.columns:
df['market_momentum'] = market_df['spy_close'].pct_change(5)
# Options-specific features
if len(self.options_data) > 10:
options_df = pd.DataFrame(self.options_data[-len(df):])
if len(options_df) > 0 and 'timestamp' in options_df.columns:
# --- Average implied volatility ---
if 'implied_volatility' in options_df.columns:
avg_iv_series = options_df.groupby('timestamp')['implied_volatility'].mean()
df = df.merge(avg_iv_series.rename('avg_iv'), on='timestamp', how='left')
df['avg_iv'] = df['avg_iv'].fillna(method='ffill').fillna(method='bfill')
# --- Call/Put ratio ---
if 'volume' in options_df.columns and 'right' in options_df.columns:
call_volume = options_df[options_df['right'] == OptionRight.Call].groupby('timestamp')['volume'].sum()
put_volume = options_df[options_df['right'] == OptionRight.Put].groupby('timestamp')['volume'].sum()
# Avoid divide-by-zero
put_volume = put_volume.replace(0, np.nan)
call_put_ratio = (call_volume / put_volume).replace([np.inf, -np.inf], np.nan)
df = df.merge(call_put_ratio.rename('call_put_ratio'), on='timestamp', how='left')
df['call_put_ratio'] = df['call_put_ratio'].fillna(1.0)
# Volatility regime
if 'volatility_20' in df.columns and df['volatility_20'].notna().sum() >= 50:
df['vol_regime'] = (df['volatility_20'] > df['volatility_20'].rolling(50).mean()).astype(int)
else:
df['vol_regime'] = 0 # or np.nan or drop the column entirely
# Trend regime
df['trend_strength'] = df['ma_5'] - df['ma_20']
df['trend_regime'] = (df['trend_strength'] > 0).astype(int)
df = df.iloc[50:].copy()
# Debug any remaining NaNs
nan_report = df.isnull().sum()
if nan_report.any():
self.debug("Remaining NaNs after slicing and merging:")
self.debug(nan_report[nan_report > 0].to_string())
# Final cleanup
df = df.dropna()
return df
def CalculateSeasonalAdjustment(self):
"""Calculate seasonal adjustment based on historical performance"""
if len(self.seasonal_history) < self.seasonal_window:
return
try:
# Analyze recent performance by month
recent_history = self.seasonal_history[-30:]
current_month = self.Time.month
# Calculate performance by month
month_performance = {}
for record in recent_history:
month = record['month']
if month not in month_performance:
month_performance[month] = []
month_performance[month].append(record['prediction_accuracy'])
# Get current month's typical performance
if current_month in month_performance:
current_month_perf = np.mean(month_performance[current_month])
overall_perf = np.mean([p for perfs in month_performance.values() for p in perfs])
# Adjust multiplier based on relative performance
if overall_perf > 0:
self.seasonal_multiplier = current_month_perf / overall_perf
# Additional adjustment for known problematic periods
if current_month in [1, 2, 3]: # Q1 issues
self.seasonal_multiplier *= 0.8
# Clamp multiplier
self.seasonal_multiplier = np.clip(self.seasonal_multiplier, 0.3, 2.0)
except Exception as e:
self.Debug(f"Seasonal adjustment failed: {str(e)}")
def CreateFeatures(self, df):
"""Create technical indicators and features"""
df = df.copy()
# Basic price features
df['price_change'] = df['close'].pct_change()
df['price_change_abs'] = df['price_change'].abs()
df['hl_ratio'] = (df['high'] - df['low']) / df['close']
df['oc_ratio'] = (df['open'] - df['close']) / df['close']
# Volume features
df['volume_change'] = df['volume'].pct_change()
df['volume_price_trend'] = df['volume'] * df['price_change']
# Momentum indicators
for period in [3, 5, 10, 20]:
df[f'momentum_{period}'] = df['close'].pct_change(period)
df[f'volatility_{period}'] = df['close'].rolling(period).std()
df[f'volume_ma_{period}'] = df['volume'].rolling(period).mean()
df[f'volume_ratio_{period}'] = df['volume'] / df[f'volume_ma_{period}']
# RSI
delta = df['close'].diff()
gain = delta.where(delta > 0, 0).rolling(window=14).mean()
loss = -delta.where(delta < 0, 0).rolling(window=14).mean()
rs = gain / loss
df['rsi'] = 100 - (100 / (1 + rs))
# Moving averages
for ma in [5, 10, 20]:
df[f'ma_{ma}'] = df['close'].rolling(ma).mean()
df[f'ma_signal_{ma}'] = (df['close'] - df[f'ma_{ma}']) / df[f'ma_{ma}']
# Bollinger Bands
df['bb_upper'] = df['ma_20'] + df['close'].rolling(20).std() * 2
df['bb_lower'] = df['ma_20'] - df['close'].rolling(20).std() * 2
df['bb_position'] = (df['close'] - df['bb_lower']) / (df['bb_upper'] - df['bb_lower'])
# Lagged features
for lag in [1, 2, 3, 5]:
df[f'price_change_lag_{lag}'] = df['price_change'].shift(lag)
df[f'volume_change_lag_{lag}'] = df['volume_change'].shift(lag)
return df
def TrainModels(self):
"""Train all regime-specific models with class balance checking"""
if self.is_warming_up:
return
if len(self.price_data) < 100:
return
try:
# Convert to DataFrame
df = pd.DataFrame(self.price_data)
# Create enhanced features
df = self.CreateEnhancedFeatures(df)
# Create labels
labels = self.CreateLabels(df)
# Prepare features
self.feature_columns = self.GetFeatureColumns(df)
features = df[self.feature_columns]
# Remove invalid data
valid_idx = ~(features.isna().any(axis=1) | np.isnan(labels))
features_clean = features[valid_idx]
labels_clean = labels[valid_idx]
if len(features_clean) < 50:
self.Debug("Insufficient clean data for training")
return
# Check class distribution
unique_classes, class_counts = np.unique(labels_clean, return_counts=True)
min_class_count = np.min(class_counts)
self.Debug(f"Class distribution: {dict(zip(unique_classes, class_counts))}")
# Train each regime model
for regime_name, model in self.models.items():
try:
# For XGBoost, we need to be extra careful about class balance
if regime_name == 'high_vol' and isinstance(model, xgb.XGBClassifier):
# Check if we have enough balanced data for XGBoost
if min_class_count < 5:
self.Debug(f"Skipping XGBoost training for {regime_name} due to insufficient class balance")
continue
# Set num_class parameter explicitly
model.set_params(num_class=len(unique_classes))
# Split data with stratification
try:
X_train, X_test, y_train, y_test = train_test_split(
features_clean, labels_clean, test_size=0.2,
stratify=labels_clean, random_state=42
)
except ValueError as e:
# If stratification fails, use random split
self.Debug(f"Stratification failed for {regime_name}, using random split: {str(e)}")
X_train, X_test, y_train, y_test = train_test_split(
features_clean, labels_clean, test_size=0.2, random_state=42
)
# Additional check for training set class balance
train_unique_classes = np.unique(y_train)
if len(train_unique_classes) < 2:
self.Debug(f"Insufficient classes in training set for {regime_name}")
continue
# Scale features
scaler = self.scalers[regime_name]
# Replace inf/-inf with NaN
X_train = np.where(np.isinf(X_train), np.nan, X_train)
# Impute NaNs (e.g., caused by infs) with column means
imputer = SimpleImputer(strategy='mean')
X_train = imputer.fit_transform(X_train)
# Optional: clip extremely large values
X_train = np.clip(X_train, -1e6, 1e6)
# self.Debug(f"Any NaNs: {np.isnan(X_test).any()}")
# self.Debug(f"Any +inf: {np.isinf(X_test).any()}")
# self.Debug(f"Any too large: {(np.abs(X_test) > 1e308).any()}")
X_test = np.nan_to_num(X_test, nan=0.0, posinf=1e6, neginf=-1e6)
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# Train model with additional error handling
try:
model.fit(X_train_scaled, y_train)
# Evaluate
y_pred = model.predict(X_test_scaled)
accuracy = accuracy_score(y_test, y_pred)
self.regime_accuracies[regime_name] = accuracy
self.Debug(f"{regime_name} model accuracy: {accuracy:.4f}")
except Exception as model_error:
self.Debug(f"Model fitting failed for {regime_name}: {str(model_error)}")
# Keep previous accuracy or set to default
if regime_name not in self.regime_accuracies:
self.regime_accuracies[regime_name] = 0.5
continue
except Exception as e:
self.Debug(f"Failed to train {regime_name} model: {str(e)}")
# Keep previous accuracy or set to default
if regime_name not in self.regime_accuracies:
self.regime_accuracies[regime_name] = 0.5
continue
# Check if at least one model was trained successfully
successful_models = sum(1 for regime, acc in self.regime_accuracies.items() if acc > 0.3)
if successful_models > 0:
self.trained = True
self.Debug(f"Training completed. {successful_models} models trained successfully")
else:
self.Debug("No models trained successfully")
except Exception as e:
self.Debug(f"Model training failed: {str(e)}")
def _initialize_model(self, model_type):
"""Initialize different model types with better error handling"""
if model_type == "xgboost":
return xgb.XGBClassifier(
n_estimators=50, # Reduced from 100 to speed up training
max_depth=4, # Reduced from 6 to prevent overfitting
learning_rate=0.1,
subsample=0.8,
colsample_bytree=0.8,
random_state=42,
eval_metric='mlogloss', # Explicitly set eval metric
verbosity=0 # Reduce XGBoost verbosity
)
elif model_type == "random_forest":
return RandomForestClassifier(
n_estimators=50, # Reduced from 100
max_depth=8, # Reduced from 10
random_state=42,
n_jobs=-1,
min_samples_split=5, # Added to prevent overfitting
min_samples_leaf=2 # Added to prevent overfitting
)
elif model_type == "gradient_boost":
return GradientBoostingClassifier(
n_estimators=50, # Reduced from 100
learning_rate=0.1,
max_depth=4, # Reduced from 6
random_state=42,
min_samples_split=5,
min_samples_leaf=2
)
elif model_type == "logistic":
return LogisticRegression(
random_state=42,
max_iter=500, # Reduced from 1000
C=1.0,
solver='lbfgs', # Explicitly set solver
multi_class='ovr' # Explicitly set multi-class strategy
)
else:
return RandomForestClassifier(
n_estimators=50,
max_depth=8,
random_state=42,
min_samples_split=5,
min_samples_leaf=2
)
def CreateLabels(self, df):
"""Create target labels using robust percentile-based thresholds"""
future_returns = df['close'].shift(-self.prediction_horizon) / df['close'] - 1
# Use percentile-based binning always
upper = np.percentile(future_returns.dropna(), 70)
lower = np.percentile(future_returns.dropna(), 30)
raw_labels = np.where(future_returns > upper, 1,
np.where(future_returns < lower, -1, 0))
# Map to 0, 1, 2 for model compatibility
label_mapping = {-1: 0, 0: 1, 1: 2}
mapped_labels = np.array([label_mapping.get(label, 1) for label in raw_labels])
return mapped_labels
def GetFeatureColumns(self, df):
"""Get available feature columns"""
base_features = [
'price_change', 'price_change_abs', 'hl_ratio', 'oc_ratio',
'volume_change', 'volume_price_trend',
'momentum_3', 'momentum_5', 'momentum_10', 'momentum_20',
'volatility_3', 'volatility_5', 'volatility_10', 'volatility_20',
'volume_ratio_3', 'volume_ratio_5', 'volume_ratio_10', 'volume_ratio_20',
'rsi', 'ma_signal_5', 'ma_signal_10', 'ma_signal_20', 'bb_position',
'price_change_lag_1', 'price_change_lag_2', 'price_change_lag_3', 'price_change_lag_5',
'volume_change_lag_1', 'volume_change_lag_2', 'volume_change_lag_3', 'volume_change_lag_5',
'month_sin', 'month_cos', 'hour_sin', 'hour_cos',
'is_q1', 'is_q2', 'is_q3', 'is_q4',
'vol_regime', 'trend_regime', 'avg_iv', 'call_put_ratio'
]
return [col for col in base_features if col in df.columns]
def PredictSignal(self):
"""Make prediction using regime-appropriate model"""
if len(self.price_data) < self.lookback_periods:
return 0, 0.0
try:
# Convert recent data to DataFrame
recent_data = pd.DataFrame(self.price_data[-100:])
# Create features
df_features = self.CreateEnhancedFeatures(recent_data)
# Check if we have any data after feature creation
if df_features.empty:
self.Debug("No data after feature creation")
return 0, 0.0
# Check if we have the required feature columns
if not self.feature_columns:
self.Debug("No feature columns defined")
return 0, 0.0
# Get available features (in case some are missing)
available_features = [col for col in self.feature_columns if col in df_features.columns]
if not available_features:
self.Debug("No available features found")
return 0, 0.0
# Prepare features
features = df_features[available_features]
# Check if we have any valid rows
if features.empty:
self.Debug("Features dataframe is empty")
return 0, 0.0
# Get the last row and check for validity
last_row = features.iloc[-1:]
# Check for NaN values in the last row
if last_row.isnull().any().any():
self.Debug("NaN values found in last row features")
return 0, 0.0
last_features = last_row.values
# Final check - ensure we have data to scale
if last_features.size == 0:
self.Debug("Empty feature array")
return 0, 0.0
# Check if current regime model exists and is trained
if self.current_regime not in self.models:
self.Debug(f"Model for regime {self.current_regime} not found")
return 0, 0.0
model = self.models[self.current_regime]
scaler = self.scalers[self.current_regime]
# Check if scaler has been fitted (has scale_ attribute)
if not hasattr(scaler, 'scale_'):
self.Debug(f"Scaler for regime {self.current_regime} not fitted")
return 0, 0.0
# Additional safety check: ensure feature dimensions match
if len(available_features) != len(scaler.scale_):
self.Debug(f"Feature dimension mismatch: {len(available_features)} vs {len(scaler.scale_)}")
return 0, 0.0
# Scale features
scaled_features = scaler.transform(last_features)
# Get prediction and confidence
mapped_prediction = model.predict(scaled_features)[0]
# Get prediction probabilities if available
if hasattr(model, 'predict_proba'):
try:
proba = model.predict_proba(scaled_features)[0]
confidence = np.max(proba)
except:
confidence = self.regime_accuracies[self.current_regime]
else:
confidence = self.regime_accuracies[self.current_regime]
# Map back to original labels
reverse_mapping = {0: -1, 1: 0, 2: 1}
prediction = reverse_mapping.get(mapped_prediction, 0)
# Apply seasonal adjustment
adjusted_prediction = prediction * self.seasonal_multiplier
# Convert back to discrete signal
if adjusted_prediction > 0.5:
final_prediction = 1
elif adjusted_prediction < -0.5:
final_prediction = -1
else:
final_prediction = 0
return final_prediction, confidence
except Exception as e:
self.Debug(f"Prediction failed: {str(e)}")
return 0, 0.0
def FindBestOption(self, option_right, target_delta=None):
"""Find the best option to trade based on delta, volume, and spread"""
options = self.current_call_options if option_right == OptionRight.Call else self.current_put_options
if not options:
return None
best_option = None
best_score = -np.inf
for strike, option in options.items():
try:
# Validate fields
bid = option.get('bid', 0)
ask = option.get('ask', 0)
delta = option.get('delta', 0)
volume = option.get('volume', 0)
if bid <= 0 or ask <= 0 or volume <= 0:
continue # Skip illiquid or broken option
if ask - bid > 1.0:
continue # Too wide spread
# Delta scoring
if target_delta is not None:
delta_diff = abs(abs(delta) - abs(target_delta))
delta_score = max(0, 1 - delta_diff)
else:
delta_score = 1.0
# Volume (normalized)
volume_score = min(volume / 500.0, 1.0)
# Spread score
spread = ask - bid
spread_score = 1.0 / (1 + spread)
score = 0.4 * delta_score + 0.3 * volume_score + 0.3 * spread_score
if score > best_score:
best_score = score
best_option = option
except Exception as e:
self.Debug(f"Option scoring failed: {e}")
continue
return best_option
def ExecuteOptionsTrading(self):
"""Execute options trades with enhanced overnight controls"""
if self.is_warming_up or not self.trained:
return
# Enhanced trading hours logic
current_hour_minute = self.Time.hour + self.Time.minute / 60.0
# Standard trading hours: 9:35 AM to 3:30 PM ET
if current_hour_minute < 9.58 or current_hour_minute >= 15.5: # 9:35 AM to 3:30 PM
return
# If intraday only, don't open new positions late in the day
if self.intraday_only and current_hour_minute >= 14.5: # After 2:30 PM, don't open new positions
return
# Check if we already have an open options position
if len(self.option_positions) > 0:
# If not allowing overnight trades, be more conservative about new positions
if not self.allow_overnight_trades:
return
# If allowing overnight, still limit to reasonable position count
elif len(self.option_positions) >= 3:
return
prediction, confidence = self.PredictSignal()
self.last_prediction = prediction
self.prediction_confidence = confidence
# Higher confidence threshold for overnight trades
min_confidence = 0.8 if self.allow_overnight_trades else self.min_confidence_threshold
if confidence <= min_confidence:
return
portfolio_value = self.Portfolio.TotalPortfolioValue
regime_multiplier = self.regime_accuracies[self.current_regime]
allocation = portfolio_value * self.max_options_allocation * confidence * regime_multiplier
if prediction == 1:
option = self.FindBestOption(OptionRight.Call, target_delta=0.5)
if option:
self.TradeOption(option, allocation, "BUY_CALL")
elif prediction == -1:
option = self.FindBestOption(OptionRight.Put, target_delta=-0.5)
if option:
self.TradeOption(option, allocation, "BUY_PUT")
# Enhanced logging
self.Debug(f"Options Signal - Regime: {self.current_regime} | Prediction: {prediction} | Confidence: {confidence:.3f} | Overnight: {self.allow_overnight_trades}")
def TradeOption(self, option_info, max_allocation, trade_type):
"""Option trading with better stop loss tracking"""
try:
symbol = option_info['symbol']
bid = option_info['bid']
ask = option_info['ask']
if bid <= 0 or ask <= 0:
self.Debug(f"TradeOption aborted: invalid bid/ask for {symbol}")
return
mid_price = (bid + ask) / 2
if mid_price <= 0:
return
# Check if we already have this position
if symbol in self.option_positions:
self.Debug(f"TradeOption aborted: already have position in {symbol}")
return
# Check position limits based on overnight setting
max_positions = 3 if self.allow_overnight_trades else 1
if len(self.option_positions) >= max_positions:
self.Debug(f"TradeOption aborted: already have {len(self.option_positions)} open positions (max: {max_positions})")
return
quantity = self.quantity
# Additional safety check - make sure we have enough cash
required_cash = mid_price * 100 # 1 contract = 100 shares
if self.Portfolio.Cash < required_cash * 1.1: # 10% buffer
self.Debug(f"TradeOption aborted: insufficient cash. Need ${required_cash:.2f}, have ${self.Portfolio.Cash:.2f}")
return
# Confirm we're not selling naked
if trade_type.startswith("BUY"):
ticket = self.MarketOrder(symbol, quantity)
# Only add to tracking if order was successfully placed
if ticket:
self.option_positions[symbol] = {
'entry_price': mid_price,
'entry_time': self.Time,
'quantity': quantity,
'trade_type': trade_type,
'target_profit': mid_price * (1 + self.profit_target),
'stop_loss': mid_price * (1 + self.options_stop_loss), # Use options-specific stop loss
'regime': self.current_regime,
'confidence': self.prediction_confidence,
'ticket': ticket,
'allow_overnight': self.allow_overnight_trades # Track overnight setting
}
self.Debug(f"Trade placed: {trade_type} {quantity}x {symbol} @ {mid_price:.2f} | Stop: {mid_price * (1 + self.options_stop_loss):.2f}")
else:
self.Debug(f"Failed to place order for {symbol}")
else:
self.Debug(f"TradeOption rejected: unsupported trade_type {trade_type}")
except Exception as e:
self.Debug(f"Trade execution error: {str(e)}")
def ManageOptionsPositions(self):
"""Position management with configurable stop loss and overnight controls"""
if not self.option_positions:
return
positions_to_close = []
current_hour_minute = self.Time.hour + self.Time.minute / 60.0
for symbol, position_info in self.option_positions.items():
try:
# Check if we still hold the position
if not self.Portfolio[symbol].Invested:
positions_to_close.append(symbol)
self.Debug(f"Position {symbol} no longer held - removing from tracking")
continue
# Get current option price
current_price = self.Securities[symbol].Price
entry_price = position_info['entry_price']
if current_price == 0:
continue
# Calculate P&L
pnl_percent = (current_price - entry_price) / entry_price
# Check exit conditions
should_exit = False
exit_reason = ""
# Profit target
if pnl_percent >= self.profit_target:
should_exit = True
exit_reason = "PROFIT_TARGET"
# Enhanced stop loss - use options-specific stop loss
elif pnl_percent <= self.options_stop_loss:
should_exit = True
exit_reason = "OPTIONS_STOP_LOSS"
# Time-based exit (close positions near expiration)
elif hasattr(self.Securities[symbol], 'Expiry'):
days_to_expiry = (self.Securities[symbol].Expiry - self.Time).days
if days_to_expiry <= 0:
should_exit = True
exit_reason = "EXPIRATION"
# Intraday-only exit condition
elif self.intraday_only and current_hour_minute >= self.eod_liquidation_time:
should_exit = True
exit_reason = "INTRADAY_ONLY_EOD"
# Overnight trading control
elif not self.allow_overnight_trades and current_hour_minute >= self.eod_liquidation_time:
should_exit = True
exit_reason = "NO_OVERNIGHT_ALLOWED"
# Regime change exit (only if not allowing overnight trades)
elif not self.allow_overnight_trades and position_info['regime'] != self.current_regime:
should_exit = True
exit_reason = "REGIME_CHANGE"
# Execute exit
if should_exit:
self.Liquidate(symbol)
positions_to_close.append(symbol)
self.Debug(f"Options Exit - {exit_reason}: {symbol} | P&L: {pnl_percent:.2%} | Price: {current_price:.2f}")
except Exception as e:
self.Debug(f"Position management failed for {symbol}: {str(e)}")
positions_to_close.append(symbol)
# Clean up closed positions
for symbol in positions_to_close:
if symbol in self.option_positions:
del self.option_positions[symbol]
def AdjustOptionsParameters(self):
"""Dynamically adjust options parameters based on market conditions"""
try:
if len(self.price_data) < 50:
return
# Get recent volatility
recent_df = pd.DataFrame(self.price_data[-50:])
if 'close' in recent_df.columns:
recent_vol = recent_df['close'].pct_change().std()
# Adjust stop loss based on volatility
base_stop = -0.35
if recent_vol > 0.02: # High volatility
self.options_stop_loss = base_stop * 1.2 # Wider stop loss
elif recent_vol < 0.01: # Low volatility
self.options_stop_loss = base_stop * 0.8 # Tighter stop loss
else:
self.options_stop_loss = base_stop
previous_overnight_status = getattr(self, 'allow_overnight_trades', None)
if self.regime_confidence < 0.4:
self.allow_overnight_trades = False
elif not self.never_overnight:
self.allow_overnight_trades = True
# Emit debug message only if there's a change in status
if self.allow_overnight_trades != previous_overnight_status:
if self.allow_overnight_trades:
self.debug(f"Overnight trades enabled according to regime confidence: {self.regime_confidence}")
else:
self.debug("Overnight trades disabled due to low regime confidence")
except Exception as e:
self.Debug(f"Parameter adjustment failed: {str(e)}")
def OnOrderEvent(self, orderEvent):
"""Handle fill and rejection events"""
if orderEvent.Status == OrderStatus.Filled:
order = self.Transactions.GetOrderById(orderEvent.OrderId)
self.Debug(f"Order Filled: {order.Symbol} | Qty: {order.Quantity} @ {orderEvent.FillPrice}")
# Update position tracking with actual fill price
if order.Symbol in self.option_positions:
self.option_positions[order.Symbol]['entry_price'] = orderEvent.FillPrice
self.option_positions[order.Symbol]['target_profit'] = orderEvent.FillPrice * (1 + self.profit_target)
self.option_positions[order.Symbol]['stop_loss'] = orderEvent.FillPrice * (1 + self.stop_loss)
elif orderEvent.Status == OrderStatus.Canceled:
self.Debug(f"Order Canceled: {orderEvent.Symbol} | Reason: {orderEvent.Message}")
# Remove from tracking if order was canceled
if orderEvent.Symbol in self.option_positions:
del self.option_positions[orderEvent.Symbol]
elif orderEvent.Status == OrderStatus.Invalid:
self.Debug(f"Order Invalid: {orderEvent.Symbol} | Error: {orderEvent.Message}")
# Remove from tracking if order was invalid
if orderEvent.Symbol in self.option_positions:
del self.option_positions[orderEvent.Symbol]
def DailyEODProcessing(self):
"""End of day processing with overnight controls"""
if self.is_warming_up:
return
try:
# Performance metrics
portfolio_value = self.Portfolio.TotalPortfolioValue
cash = self.Portfolio.Cash
options_value = sum([self.Portfolio[symbol].HoldingsValue for symbol in self.option_positions.keys()])
# Log daily performance
self.Debug(f"EOD - Portfolio: ${portfolio_value:.0f} | Cash: ${cash:.0f} | Options: ${options_value:.0f}")
self.Debug(f"EOD - Active Positions: {len(self.option_positions)} | Regime: {self.current_regime} | Overnight: {self.allow_overnight_trades}")
# Enhanced position management based on overnight settings
current_hour_minute = self.Time.hour + self.Time.minute / 60.0
if current_hour_minute >= self.eod_liquidation_time: # After 3:30 PM ET
for symbol in list(self.option_positions.keys()):
should_liquidate = False
reason = ""
# Always liquidate if expiring today
if hasattr(self.Securities[symbol], 'Expiry'):
days_to_expiry = (self.Securities[symbol].Expiry - self.Time).days
if days_to_expiry <= 0:
should_liquidate = True
reason = "EXPIRATION_RISK"
# Liquidate if intraday only mode
elif self.intraday_only:
should_liquidate = True
reason = "INTRADAY_ONLY_MODE"
# Liquidate if not allowing overnight trades
elif not self.allow_overnight_trades:
should_liquidate = True
reason = "NO_OVERNIGHT_ALLOWED"
if should_liquidate:
self.Liquidate(symbol)
if symbol in self.option_positions:
del self.option_positions[symbol]
self.Debug(f"EOD Liquidation: {symbol} - {reason}")
# Update performance tracking for seasonal adjustment
if hasattr(self, 'last_prediction') and self.last_prediction != 0:
seasonal_record = {
'month': self.time.month,
'prediction_accuracy': self.regime_accuracies[self.current_regime],
'timestamp': self.time
}
self.seasonal_history.append(seasonal_record)
# Keep seasonal history manageable
if len(self.seasonal_history) > 100:
self.seasonal_history = self.seasonal_history[-100:]
except Exception as e:
self.Debug(f"End of day processing failed: {str(e)}")
def OnEndOfAlgorithm(self):
"""Final cleanup and reporting"""
try:
# Liquidate all positions
self.Liquidate()
# Final performance summary
final_value = self.Portfolio.TotalPortfolioValue
initial_value = 100000 # Initial cash
total_return = (final_value - initial_value) / initial_value
self.Debug(f"Algorithm completed.")
self.Debug(f"Final Portfolio Value: ${final_value:.2f}")
self.Debug(f"Total Return: {total_return:.2%}")
self.Debug(f"Final Regime: {self.current_regime}")
# Model performance summary
self.Debug("Regime Model Accuracies:")
for regime, accuracy in self.regime_accuracies.items():
self.Debug(f" {regime}: {accuracy:.4f}")
except Exception as e:
self.Debug(f"Algorithm end processing failed: {str(e)}")