| Overall Statistics |
|
Total Orders 903 Average Win 1.88% Average Loss -0.57% Compounding Annual Return 10.580% Drawdown 18.600% Expectancy 0.232 Start Equity 100000 End Equity 172318.9 Net Profit 72.319% Sharpe Ratio 0.386 Sortino Ratio 0.354 Probabilistic Sharpe Ratio 11.759% Loss Rate 71% Win Rate 29% Profit-Loss Ratio 3.31 Alpha 0.026 Beta 0.328 Annual Standard Deviation 0.135 Annual Variance 0.018 Information Ratio -0.156 Tracking Error 0.171 Treynor Ratio 0.159 Total Fees $1941.45 Estimated Strategy Capacity $1700000000.00 Lowest Capacity Asset ES YTG30NVEFCW1 Portfolio Turnover 79.55% |
# region imports
from AlgorithmImports import *
# endregion
from clr import AddReference
AddReference("System")
AddReference("QuantConnect.Algorithm")
AddReference("QuantConnect.Common")
AddReference("QuantConnect.Indicators")
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from collections import deque, defaultdict
from System import *
from QuantConnect import *
from QuantConnect.Algorithm import *
from QuantConnect.Data import *
from QuantConnect.Data.Consolidators import *
from QuantConnect.Orders import *
from QuantConnect.Python import PythonQuandl
# TensorFlow and related imports
import tensorflow as tf
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import Dense, LSTM, Input, RepeatVector, TimeDistributed, Dropout, Concatenate
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.optimizers import Adam
from sklearn.preprocessing import StandardScaler
class MediumFrequencyTickAnalysis(QCAlgorithm):
def Initialize(self):
"""Initialize the algorithm and set required parameters"""
self.SetStartDate(2020, 1, 1) # Adjust dates based on data availability
self.SetEndDate(2022, 12, 31) # Adjust dates based on data availability
self.SetCash(100000) # Set strategy capital
# Add ticker(s) to track - symbols with high liquidity work best
self.symbol = self.AddEquity("SPY", Resolution.Tick).Symbol
# Set up consolidators for multiple timeframes
self.minute_bars = {}
self.hour_bars = {}
self.daily_bars = {}
# Create consolidators
tick_consolidator = TickConsolidator(timedelta(minutes=1))
minute_consolidator = TickConsolidator(timedelta(hours=1))
hour_consolidator = TickConsolidator(timedelta(days=1))
# Register consolidator handlers
self.SubscriptionManager.AddConsolidator(self.symbol, tick_consolidator)
tick_consolidator.DataConsolidated += self.OnMinuteBarHandler
self.minute_consolidator = minute_consolidator
self.SubscriptionManager.AddConsolidator(self.symbol, minute_consolidator)
minute_consolidator.DataConsolidated += self.OnHourBarHandler
self.hour_consolidator = hour_consolidator
self.SubscriptionManager.AddConsolidator(self.symbol, hour_consolidator)
hour_consolidator.DataConsolidated += self.OnDailyBarHandler
# Set up algorithm parameters
self.tick_buffer_size = 10000 # Number of ticks to store for analysis
self.analysis_window = 3000 # Number of ticks to analyze for patterns
self.aggregation_minutes = 15 # Minutes to aggregate tick data
self.training_window = 10 # Number of aggregation periods for training
self.persistence_threshold = 1 # How many consecutive anomalies to require
self.anomaly_threshold = 0.6 # Threshold for anomaly detection (percentile)
self.position_size = 0.10 # Position size as fraction of portfolio
# Model hyperparameters
self.encoding_dim = 32 # Size of the encoded representation
self.learning_rate = 0.001 # Learning rate for model training
# Initialize data structures
self.tick_data = deque(maxlen=self.tick_buffer_size)
self.aggregated_features = deque(maxlen=100)
self.volume_profile = defaultdict(float) # Price -> Volume mapping
self.order_flow_imbalance = deque(maxlen=50)
self.smart_money_indicators = deque(maxlen=50)
self.anomaly_scores = []
self.consecutive_anomalies = 0
# Tracking state
self.model = None
self.is_model_trained = False
self.scalers = {}
self.last_aggregation_time = None
self.in_position = False
self.entry_price = 0
self.position_start_time = None
self.max_position_duration = timedelta(days=5) # Holding period for medium-frequency
# Schedule model training and feature aggregation
self.Schedule.On(self.DateRules.EveryDay(self.symbol),
self.TimeRules.AfterMarketOpen(self.symbol, 30),
self.TrainModel)
# Schedule feature aggregation every X minutes
self.Schedule.On(self.DateRules.EveryDay(self.symbol),
self.TimeRules.Every(TimeSpan.FromMinutes(self.aggregation_minutes)),
self.AggregateFeatures)
# Market regime detection runs daily
self.Schedule.On(self.DateRules.EveryDay(self.symbol),
self.TimeRules.BeforeMarketClose(self.symbol, 30),
self.DetectMarketRegime)
def OnMinuteBarHandler(self, sender, consolidated):
"""Handler for consolidated minute bars"""
symbol = consolidated.Symbol
self.minute_bars[symbol] = consolidated
self.Debug(f"Received minute bar for {symbol}")
def OnHourBarHandler(self, sender, consolidated):
"""Handler for consolidated hour bars"""
symbol = consolidated.Symbol
self.hour_bars[symbol] = consolidated
self.Debug(f"Received Hour bar for {symbol}")
def OnDailyBarHandler(self, sender, consolidated):
"""Handler for consolidated daily bars"""
symbol = consolidated.Symbol
self.daily_bars[symbol] = consolidated
self.Debug(f"Received Daily bar for {symbol}")
def OnData(self, data):
"""Process incoming tick data"""
if self.symbol not in data.Ticks:
return
ticks = data.Ticks[self.symbol]
if not ticks or len(ticks) == 0:
return
# Process each new tick
for tick in ticks:
self.ProcessTick(tick)
def ProcessTick(self, tick):
"""Process a single tick and update our data structures"""
# Extract features from tick
tick_features = self.ExtractTickFeatures(tick)
# Store in our buffer
self.tick_data.append(tick_features)
# Update volume profile
price_bucket = round(tick.Price, 2) # Round to 2 decimal places for bucketing
self.volume_profile[price_bucket] += tick.Quantity
# If we have enough data and model is trained, check for trade opportunities
if self.is_model_trained and len(self.aggregated_features) >= self.training_window:
self.EvaluateTradeOpportunity()
def ExtractTickFeatures(self, tick):
"""Extract comprehensive features from a single tick"""
features = {
'price': float(tick.Price),
'volume': float(tick.Quantity),
'bid_price': float(tick.BidPrice),
'ask_price': float(tick.AskPrice),
'timestamp': tick.Time.timestamp(),
'bid_size': float(tick.BidSize) if hasattr(tick, 'BidSize') else 0,
'ask_size': float(tick.AskSize) if hasattr(tick, 'AskSize') else 0,
'exchange': tick.Exchange if hasattr(tick, 'Exchange') else "",
'is_buy': 1 if tick.Price >= tick.AskPrice else (0 if tick.Price <= tick.BidPrice else 0.5)
}
# Add derived features if we have enough history
if len(self.tick_data) > 0:
last_tick = self.tick_data[-1]
features['price_delta'] = features['price'] - last_tick['price']
features['volume_delta'] = features['volume'] - last_tick['volume']
features['spread'] = features['ask_price'] - features['bid_price']
features['mid_price'] = (features['ask_price'] + features['bid_price']) / 2
features['time_delta'] = features['timestamp'] - last_tick['timestamp']
# Order flow imbalance indicators
features['bid_ask_imbalance'] = (features['bid_size'] - features['ask_size']) / (features['bid_size'] + features['ask_size'] + 1e-10)
features['trade_imbalance'] = 1 if features['is_buy'] > 0.5 else (-1 if features['is_buy'] < 0.5 else 0)
# Volatility indicators
if len(self.tick_data) >= 100:
recent_prices = [t['price'] for t in list(self.tick_data)[-100:]]
features['rolling_volatility'] = np.std(recent_prices)
features['price_velocity'] = np.mean([t['price_delta'] for t in list(self.tick_data)[-20:]])
else:
# Default values if no history
features['price_delta'] = 0
features['volume_delta'] = 0
features['spread'] = features['ask_price'] - features['bid_price']
features['mid_price'] = (features['ask_price'] + features['bid_price']) / 2
features['time_delta'] = 0
features['bid_ask_imbalance'] = 0
features['trade_imbalance'] = 0
features['rolling_volatility'] = 0
features['price_velocity'] = 0
return features
def AggregateFeatures(self):
"""Aggregate tick features into medium-frequency metrics"""
if len(self.tick_data) < 100: # Need sufficient data
return
current_time = self.Time
self.last_aggregation_time = current_time
# Get recent ticks for analysis
recent_ticks = list(self.tick_data)[-self.analysis_window:]
if len(recent_ticks) < self.analysis_window // 2:
return
# Create a DataFrame for easier analysis
df = pd.DataFrame(recent_ticks)
# Order Flow Imbalance metrics
order_flow = {
'buy_volume': df[df['is_buy'] > 0.5]['volume'].sum(),
'sell_volume': df[df['is_buy'] < 0.5]['volume'].sum(),
'neutral_volume': df[df['is_buy'] == 0.5]['volume'].sum(),
'bid_ask_imbalance_mean': df['bid_ask_imbalance'].mean(),
'bid_ask_imbalance_std': df['bid_ask_imbalance'].std(),
'trade_imbalance_sum': df['trade_imbalance'].sum(),
'num_buys': (df['is_buy'] > 0.5).sum(),
'num_sells': (df['is_buy'] < 0.5).sum()
}
# Calculate order flow imbalance ratio (-1 to 1 scale)
total_volume = order_flow['buy_volume'] + order_flow['sell_volume'] + 1e-10
order_flow['imbalance_ratio'] = (order_flow['buy_volume'] - order_flow['sell_volume']) / total_volume
self.order_flow_imbalance.append(order_flow['imbalance_ratio'])
# Volume Profile metrics
volume_profile = {
'total_volume': df['volume'].sum(),
'weighted_avg_price': (df['price'] * df['volume']).sum() / (df['volume'].sum() + 1e-10),
'volume_weighted_volatility': np.sqrt((df['volume'] * (df['price'] - df['price'].mean())**2).sum() / (df['volume'].sum() + 1e-10)),
'price_range': df['price'].max() - df['price'].min(),
'volume_concentration': df.groupby(df['price'].round(2))['volume'].sum().max() / (df['volume'].sum() + 1e-10)
}
# Smart Money Indicators
# 1. Large trades detection
large_trade_threshold = np.percentile(df['volume'], 95)
large_trades = df[df['volume'] > large_trade_threshold]
smart_money = {
'large_trade_ratio': large_trades['volume'].sum() / (df['volume'].sum() + 1e-10),
'large_trade_imbalance': (large_trades[large_trades['is_buy'] > 0.5]['volume'].sum() -
large_trades[large_trades['is_buy'] < 0.5]['volume'].sum()) /
(large_trades['volume'].sum() + 1e-10),
'spread_trend': df['spread'].iloc[-100:].mean() / (df['spread'].iloc[:100].mean() + 1e-10) - 1,
'iceberg_indicator': df['volume'].std() / (df['volume'].mean() + 1e-10),
'price_impact': np.corrcoef(df['volume'], df['price_delta'])[0, 1] if len(df) > 1 else 0
}
# Combine all features
agg_features = {**order_flow, **volume_profile, **smart_money}
# Add market context using consolidated bars
if self.symbol in self.minute_bars and self.symbol in self.hour_bars and self.symbol in self.daily_bars:
minute_bar = self.minute_bars[self.symbol]
hour_bar = self.hour_bars[self.symbol]
daily_bar = self.daily_bars[self.symbol]
# Daily performance (today's return)
if daily_bar is not None and daily_bar.Open > 0:
agg_features['daily_return'] = (daily_bar.Close - daily_bar.Open) / daily_bar.Open
else:
agg_features['daily_return'] = 0
# Calculate SMA50 manually from daily bars (alternative to the removed SMA call)
if self.symbol in self.daily_bars:
# This is a simplified approach - in production you would maintain a proper history window
agg_features['price_to_sma50'] = 1.0 # Default value
agg_features['market_hours_progress'] = (current_time.hour * 60 + current_time.minute - 9 * 60 - 30) / (16 * 60 - 9 * 60 - 30)
agg_features['day_of_week'] = current_time.weekday()
# Store aggregated features
self.aggregated_features.append(agg_features)
self.smart_money_indicators.append(smart_money['large_trade_imbalance'])
# Log interesting insights
self.Debug(f"Order Flow Imbalance: {order_flow['imbalance_ratio']:.4f}, " +
f"Large Trade Imbalance: {smart_money['large_trade_imbalance']:.4f}, " +
f"Volume Concentration: {volume_profile['volume_concentration']:.4f}")
def DetectMarketRegime(self):
"""Detect the current market regime based on tick behavior"""
if len(self.aggregated_features) < 10:
return
# Extract recent aggregated features
recent_features = list(self.aggregated_features)[-10:]
df = pd.DataFrame(recent_features)
# Calculate regime indicators
volatility_regime = np.mean([f['volume_weighted_volatility'] for f in recent_features])
order_flow_regime = np.std([f['imbalance_ratio'] for f in recent_features])
smart_money_regime = np.mean([f['large_trade_imbalance'] for f in recent_features])
# Determine overall regime
regime = {
'volatility': 'high' if volatility_regime > 0.05 else 'medium' if volatility_regime > 0.02 else 'low',
'order_flow': 'balanced' if abs(order_flow_regime) < 0.3 else 'buy_dominated' if order_flow_regime > 0 else 'sell_dominated',
'smart_money': 'accumulating' if smart_money_regime > 0.2 else 'distributing' if smart_money_regime < -0.2 else 'neutral'
}
self.Debug(f"Market Regime: Volatility={regime['volatility']}, Order Flow={regime['order_flow']}, Smart Money={regime['smart_money']}")
# Store regime information for model input and trading decisions
self.current_regime = regime
def NormalizeFeatures(self, features_list):
"""Normalize aggregated features for model input"""
if not features_list or len(features_list) == 0:
return None
# Convert to DataFrame
df = pd.DataFrame(features_list)
# Ensure we have all expected columns
for col in df.columns:
# Initialize scaler if not exists
if col not in self.scalers:
self.scalers[col] = StandardScaler()
self.scalers[col].fit(df[[col]])
# Apply scaling
df[col] = self.scalers[col].transform(df[[col]])
# Fill any NaN values
df = df.fillna(0)
return df.values
def TrainModel(self):
"""Train the anomaly detection model on aggregated features"""
if len(self.aggregated_features) < self.training_window:
self.Debug(f"Not enough aggregated data for training. Have {len(self.aggregated_features)} periods, need {self.training_window}")
return
self.Debug("Training market microstructure model...")
# Normalize aggregated features
train_data = self.NormalizeFeatures(list(self.aggregated_features))
if train_data is None or len(train_data) == 0:
self.Debug("Failed to normalize training data")
return
# Get dimensions
n_samples, n_features = train_data.shape
# Build autoencoder model for anomaly detection
self.model = self.BuildAutoencoderModel(n_features)
# Train the model
try:
early_stopping = EarlyStopping(monitor='loss', patience=3, mode='min')
self.model.fit(
train_data, train_data,
epochs=50,
batch_size=8,
shuffle=True,
callbacks=[early_stopping],
verbose=0
)
self.is_model_trained = True
self.Debug("Model training completed successfully")
except Exception as e:
self.Debug(f"Error training model: {str(e)}")
def BuildAutoencoderModel(self, n_features):
"""Build an advanced autoencoder model for market microstructure anomaly detection"""
# Input layer
input_layer = Input(shape=(n_features,))
# Encoder
encoded = Dense(64, activation='relu')(input_layer)
encoded = Dropout(0.2)(encoded)
encoded = Dense(32, activation='relu')(encoded)
# Bottleneck layer
bottleneck = Dense(self.encoding_dim, activation='relu')(encoded)
# Decoder
decoded = Dense(32, activation='relu')(bottleneck)
decoded = Dropout(0.2)(decoded)
decoded = Dense(64, activation='relu')(decoded)
# Output layer
output_layer = Dense(n_features, activation='linear')(decoded)
# Build model
autoencoder = Model(input_layer, output_layer)
autoencoder.compile(
optimizer=Adam(learning_rate=self.learning_rate),
loss='mse'
)
return autoencoder
def DetectAnomaly(self, features):
"""Detect anomalies using the trained model"""
# Normalize input
normalized_features = self.NormalizeFeatures([features])
if normalized_features is None:
return 0
# Get reconstruction
reconstructed = self.model.predict(normalized_features, verbose=0)
# Calculate reconstruction error
mse = np.mean(np.square(normalized_features - reconstructed))
# Return anomaly score
return mse
def EvaluateTradeOpportunity(self):
"""Evaluate current market conditions for potential trade entry or exit"""
if not self.is_model_trained or not self.aggregated_features:
return
# Get most recent aggregated features
current_features = self.aggregated_features[-1]
# Calculate anomaly score
anomaly_score = self.DetectAnomaly(current_features)
self.anomaly_scores.append(anomaly_score)
# Determine if current anomaly score is high enough to be classified as anomaly
is_anomaly = False
if len(self.anomaly_scores) > 5: # (reduced from 20) Need enough history to establish baseline
threshold = np.percentile(self.anomaly_scores[-5:], self.anomaly_threshold * 100)
is_anomaly = anomaly_score > threshold
self.Debug(f"Anomaly detection: score={anomaly_score:.4f}, threshold={threshold:.4f}, is_anomaly={is_anomaly}")
else:
self.Debug(f"Not enough anomaly history: {len(self.anomaly_scores)}/5")
# Add absolute threshold as backup
if not is_anomaly and anomaly_score > 0.4: # Add an absolute threshold
is_anomaly = True
self.Debug(f"Anomaly detected using absolute threshold: {anomaly_score}")
# Debug Statement
self.Debug(f"Current state: model_trained={self.is_model_trained}, consecutive_anomalies={self.consecutive_anomalies}, anomaly_score={anomaly_score if 'anomaly_score' in locals() else 'N/A'}")
# Track consecutive anomalies for persistence
if is_anomaly:
self.consecutive_anomalies += 1
else:
self.consecutive_anomalies = 0
# Log anomaly information
if is_anomaly:
self.Debug(f"Anomaly detected - Score: {anomaly_score:.6f}, Consecutive: {self.consecutive_anomalies}")
# Manage existing position
if self.in_position:
self.Debug("Executing self.ManageExistingPosition")
self.ManageExistingPosition()
# Check for entry opportunity if persistent anomaly pattern detected
elif self.consecutive_anomalies >= self.persistence_threshold:
self.Debug("Executing self.EvaluateEntryOpportunity")
self.EvaluateEntryOpportunity(current_features)
def ManageExistingPosition(self):
"""Manage an existing position with intelligent exit rules"""
if not self.in_position:
return
current_time = self.Time
current_price = self.Securities[self.symbol].Price
# Check if we should exit based on time
time_in_position = current_time - self.position_start_time
should_exit_by_time = time_in_position > self.max_position_duration
# Check if we should exit based on profit
profit_pct = (current_price / self.entry_price - 1) * 100 * (1 if self.Portfolio[self.symbol].IsLong else -1)
should_exit_by_profit = profit_pct > 2.0 # Exit at 2% profit
# Check if we should exit based on loss
should_exit_by_loss = profit_pct < -1.0 # Exit at 1% loss
# Check for smart money reversal
if len(self.smart_money_indicators) >= 5:
recent_smart_money = list(self.smart_money_indicators)[-5:]
smart_money_trend = np.mean(recent_smart_money)
position_is_long = self.Portfolio[self.symbol].IsLong
# Exit if smart money is distributing and we're long, or accumulating and we're short
should_exit_by_smart_money = (smart_money_trend < -0.3 and position_is_long) or (smart_money_trend > 0.3 and not position_is_long)
else:
should_exit_by_smart_money = False
# Check for order flow reversal
if len(self.order_flow_imbalance) >= 3:
recent_flow = list(self.order_flow_imbalance)[-3:]
flow_trend = np.mean(recent_flow)
position_is_long = self.Portfolio[self.symbol].IsLong
should_exit_by_order_flow = (flow_trend < -0.4 and position_is_long) or (flow_trend > 0.4 and not position_is_long)
else:
should_exit_by_order_flow = False
# Exit logic
if should_exit_by_time or should_exit_by_profit or should_exit_by_loss or should_exit_by_smart_money or should_exit_by_order_flow:
self.Liquidate(self.symbol)
self.in_position = False
# Determine exit reason
if should_exit_by_time:
exit_reason = "time"
elif should_exit_by_profit:
exit_reason = "profit target"
elif should_exit_by_loss:
exit_reason = "stop loss"
elif should_exit_by_smart_money:
exit_reason = "smart money reversal"
else:
exit_reason = "order flow reversal"
self.Debug(f"Exiting position at {current_price}. Reason: {exit_reason}. Profit: {profit_pct:.2f}%")
def EvaluateEntryOpportunity(self, features):
"""Evaluate if current conditions warrant a new position"""
if self.in_position:
return
# Check for strong order flow signals
order_flow_signal = 0
if len(self.order_flow_imbalance) >= 5:
recent_flow = list(self.order_flow_imbalance)[-5:]
if np.mean(recent_flow) > 0.4:
order_flow_signal = 1 # Strong buying pressure
elif np.mean(recent_flow) < -0.4:
order_flow_signal = -1 # Strong selling pressure
self.Debug(f"orderflow signal from EEO: {order_flow_signal}")
# Check for smart money indicators
smart_money_signal = 0
if len(self.smart_money_indicators) >= 5:
recent_smart_money = list(self.smart_money_indicators)[-5:]
if np.mean(recent_smart_money) > 0.3:
smart_money_signal = 1 # Accumulation
elif np.mean(recent_smart_money) < -0.3:
smart_money_signal = -1 # Distribution
self.Debug(f"smartmoney signal from EEO: {smart_money_signal}")
# Assess volume profile
volume_signal = 0
if features['volume_concentration'] > 0.3:
# High volume concentration at a specific price level
current_price = self.Securities[self.symbol].Price
high_volume_prices = sorted([(price, vol) for price, vol in self.volume_profile.items()],
key=lambda x: x[1], reverse=True)[:3]
# If current price is breaking through high volume area
if high_volume_prices and len(high_volume_prices) > 0:
nearest_high_vol_price = min(high_volume_prices, key=lambda x: abs(x[0] - current_price))[0]
if current_price > nearest_high_vol_price and features['imbalance_ratio'] > 0:
volume_signal = 1
elif current_price < nearest_high_vol_price and features['imbalance_ratio'] < 0:
volume_signal = -1
# Combine signals (weighted sum)
combined_signal = (0.4 * order_flow_signal + 0.4 * smart_money_signal + 0.2 * volume_signal)
# Only take strong directional signals
if abs(combined_signal) < 0.2:
return
# Direction determination
direction = 1 if combined_signal > 0 else -1
# Check market regime compatibility
if hasattr(self, 'current_regime'):
# Don't go long in distribution regime or short in accumulation regime
if abs(combined_signal) > 0.6:
self.Debug(f"Taking {direction > 0 and 'long' or 'short'} trade despite {self.current_regime['smart_money']} regime due to very strong signal")
elif (direction > 0 and self.current_regime['smart_money'] == 'distributing') or \
(direction < 0 and self.current_regime['smart_money'] == 'accumulating'):
self.Debug(f"Skipping {direction > 0 and 'long' or 'short'} trade due to incompatible {self.current_regime['smart_money']} regime")
return
# Set position
if direction > 0:
self.SetHoldings(self.symbol, self.position_size)
self.Debug(f"LONG Entry - Order Flow: {order_flow_signal}, Smart Money: {smart_money_signal}, Volume: {volume_signal}")
else:
self.SetHoldings(self.symbol, -self.position_size)
self.Debug(f"SHORT Entry - Order Flow: {order_flow_signal}, Smart Money: {smart_money_signal}, Volume: {volume_signal}")
# Update position tracking
self.in_position = True
self.entry_price = self.Securities[self.symbol].Price
self.position_start_time = self.Time
def OnOrderEvent(self, orderEvent):
"""Handle order events"""
if orderEvent.Status == OrderStatus.Filled:
self.Debug(f"Order {orderEvent.OrderId} {orderEvent.Status}: {orderEvent.FillQuantity} units at ${orderEvent.FillPrice}")# Variable MA Breakout Strategy
# Adapted from PineScript strategy "TImeframed Variable MA Breakout"
# Original by shua12
# Modified to use 10-minute consolidator
from AlgorithmImports import *
import numpy as np
from datetime import time, timedelta
class VariableMABreakout(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2020, 1, 1) # Set start date
self.SetCash(100000) # Set starting cash
# Strategy parameters (matching the original)
self.vma_length = 25
self.start_time = 7
self.end_time = 12
self.leftBars = 15
self.rightBars = 4
self.tof = True # Can trade on flats
self.longs_checked = True # Trade Longs
self.shorts_checked = False
self.quantity = 0 # Trade Shorts
# Set up futures contract based on the correct documentation approach
self.UniverseSettings.Resolution = Resolution.MINUTE
self.UniverseSettings.Asynchronous = True
# Set up the SP Futures
self._future = self.AddFuture(Futures.Indices.SP_500_E_MINI,
extendedMarketHours=False, # Only use regular market hours
dataMappingMode=DataMappingMode.LastTradingDay,
dataNormalizationMode=DataNormalizationMode.BackwardsRatio,
contractDepthOffset=0)
# Set the filter to use front month contracts
self._future.SetFilter(0, 60) # Look at contracts up to 60 days out
# Initialize consolidator dictionary for 10-minute bars
self.consolidators = {}
# Initialize data structures for Variable MA calculation
self.prices = {}
self.vma_values = {}
self.high_window = {}
self.low_window = {}
self.close_window = {}
# Initialize state dictionaries keyed by symbol
self.pdmS = {}
self.mdmS = {}
self.pdiS = {}
self.mdiS = {}
self.iS = {}
self.vma = {}
self.hprice = {}
self.lprice = {}
self.le = {}
self.se = {}
self.can_buy = {}
self.can_sell = {}
self.exit_buys = {}
self.exit_sells = {}
# Track the active symbol
self.active_symbol = None
# Set the time zone
self.SetTimeZone(TimeZones.NewYork)
# Debug counter
self.debug_counter = 0
def OnData(self, data):
"""Process data - log data reception for debugging"""
if self.debug_counter % 100 == 0: # Log every 100 data points
self.Debug(f"OnData called at {self.Time}, Active symbol: {self.active_symbol}")
if self.active_symbol and self.active_symbol in data:
self.Debug(f"Data received for {self.active_symbol}: {data[self.active_symbol].Close}")
self.debug_counter += 1
def CreateConsolidator(self, symbol):
"""Create and register a 10-minute consolidator for the given symbol"""
# Create 10-minute consolidator
consolidator = TradeBarConsolidator(timedelta(minutes=10))
# Register the consolidator update handler
consolidator.DataConsolidated += self.OnTenMinuteBar
# Subscribe the consolidator to updates for this symbol
self.SubscriptionManager.AddConsolidator(symbol, consolidator)
# Store reference to consolidator
self.consolidators[symbol] = consolidator
self.Debug(f"Created 10-minute consolidator for {symbol}")
return consolidator
def OnTenMinuteBar(self, sender, bar):
"""Event handler for 10-minute consolidated bars"""
symbol = bar.Symbol
self.Debug(f"10-minute bar received for {symbol} at {self.Time}: O:{bar.Open} H:{bar.High} L:{bar.Low} C:{bar.Close}")
# Only process if this is our active symbol
if symbol != self.active_symbol:
self.Debug(f"Skipping bar for {symbol} - not active symbol (active: {self.active_symbol})")
return
self.ProcessBar(symbol, bar)
def ProcessBar(self, symbol, bar):
"""Process a new bar for the given symbol"""
# Check if we're within trading hours
current_hour = self.Time.hour
if not (current_hour >= self.start_time and current_hour <= self.end_time):
self.Debug(f"Outside trading hours ({current_hour}), skipping bar processing")
return
self.Debug(f"Processing bar for {symbol} at {self.Time}")
# Initialize data structures for this symbol if needed
self.EnsureDataStructures(symbol)
# Add latest price data
self.prices[symbol].Add(bar.Close)
self.high_window[symbol].Add(bar.High)
self.low_window[symbol].Add(bar.Low)
self.close_window[symbol].Add(bar.Close)
# Calculate VMA
self.CalculateVMA(symbol, bar.Close)
# Check for pivot points
swh, swl = self.DetectPivots(symbol)
if swh is not None:
self.Debug(f"Pivot high detected: {swh}")
if swl is not None:
self.Debug(f"Pivot low detected: {swl}")
# Update trade signals
self.UpdateTradeSignals(symbol, swh, swl, bar.Close)
# Log current state
if symbol in self.can_buy and symbol in self.can_sell:
self.Debug(f"Trade signals - Can buy: {self.can_buy[symbol]}, Can sell: {self.can_sell[symbol]}")
self.Debug(f"VMA: {self.vma[symbol]:.2f}, Close: {bar.Close:.2f}")
# Execute trade logic
self.ExecuteTrades(symbol, bar)
def EnsureDataStructures(self, symbol):
"""Initialize data structures for a symbol if they don't already exist"""
if symbol not in self.prices:
self.Debug(f"Initializing data structures for {symbol}")
# Create rolling windows
self.prices[symbol] = RollingWindow[float](self.vma_length + 1)
self.vma_values[symbol] = RollingWindow[float](self.vma_length + 1)
self.high_window[symbol] = RollingWindow[float](self.leftBars + self.rightBars + 1)
self.low_window[symbol] = RollingWindow[float](self.leftBars + self.rightBars + 1)
self.close_window[symbol] = RollingWindow[float](self.leftBars + self.rightBars + 1)
# Initialize state variables
self.pdmS[symbol] = 0.0
self.mdmS[symbol] = 0.0
self.pdiS[symbol] = 0.0
self.mdiS[symbol] = 0.0
self.iS[symbol] = 0.0
self.vma[symbol] = 0.0
self.hprice[symbol] = 0.0
self.lprice[symbol] = 0.0
self.le[symbol] = False
self.se[symbol] = False
self.can_buy[symbol] = False
self.can_sell[symbol] = False
self.exit_buys[symbol] = False
self.exit_sells[symbol] = False
# Warm up with historical data (using 10-minute bars)
try:
history = self.History(symbol, self.vma_length + self.leftBars + self.rightBars + 1, Resolution.MINUTE)
# Create 10-minute bars from 1-minute history data for warmup
if not history.empty:
# Get consolidated 10-minute bars
history_10min = self.ConsolidateHistoryTo10Min(history, symbol)
# Use the consolidated 10-minute bars for warmup
for bar_data in history_10min:
self.high_window[symbol].Add(float(bar_data['high']))
self.low_window[symbol].Add(float(bar_data['low']))
self.close_window[symbol].Add(float(bar_data['close']))
self.prices[symbol].Add(float(bar_data['close']))
self.Debug(f"Warmed up {symbol} with {len(history_10min)} historical 10-minute bars")
else:
self.Debug(f"No historical data available for {symbol}")
except Exception as e:
self.Debug(f"Error warming up data for {symbol}: {str(e)}")
# Continue without warmup data
def ConsolidateHistoryTo10Min(self, history, symbol):
"""Convert 1-minute history data to 10-minute bars for warmup"""
consolidated_bars = []
if history.empty:
return consolidated_bars
# Filter for the specific symbol - handle MultiIndex properly
try:
# Check if we have a MultiIndex
if isinstance(history.index, pd.MultiIndex):
# Get the symbol level (usually level 1)
symbol_data = history.xs(symbol, level=1)
else:
# Single index, filter by symbol column if it exists
if 'symbol' in history.columns:
symbol_data = history[history['symbol'] == symbol]
else:
symbol_data = history
except (KeyError, IndexError):
# If symbol not found or indexing fails, return empty
self.Debug(f"Could not find symbol {symbol} in history data")
return consolidated_bars
if symbol_data.empty:
return consolidated_bars
# Ensure the index is a proper DatetimeIndex
if not isinstance(symbol_data.index, pd.DatetimeIndex):
self.Debug("History index is not DatetimeIndex, cannot consolidate")
return consolidated_bars
try:
# Group by 10-minute intervals
grouped = symbol_data.resample('10T')
for timestamp, group in grouped:
if not group.empty and len(group) > 0:
bar_data = {
'timestamp': timestamp,
'open': group['open'].iloc[0],
'high': group['high'].max(),
'low': group['low'].min(),
'close': group['close'].iloc[-1],
'volume': group['volume'].sum() if 'volume' in group.columns else 0
}
consolidated_bars.append(bar_data)
except Exception as e:
self.Debug(f"Error consolidating history data: {str(e)}")
# Fallback: just use the raw data points as-is
for i in range(min(len(symbol_data), 20)): # Limit to prevent too much data
row = symbol_data.iloc[i]
bar_data = {
'timestamp': symbol_data.index[i],
'open': row['open'],
'high': row['high'],
'low': row['low'],
'close': row['close'],
'volume': row['volume'] if 'volume' in row else 0
}
consolidated_bars.append(bar_data)
return consolidated_bars
def CalculateVMA(self, symbol, src):
"""Calculate Variable Moving Average"""
if self.prices[symbol].IsReady:
k = 1.0 / self.vma_length
# Calculate PDM and MDM
pdm = max(src - self.prices[symbol][1], 0)
mdm = max(self.prices[symbol][1] - src, 0)
# Update smoothed PDM and MDM
self.pdmS[symbol] = (1 - k) * self.pdmS[symbol] + k * pdm
self.mdmS[symbol] = (1 - k) * self.mdmS[symbol] + k * mdm
# Calculate directional indicators
s = self.pdmS[symbol] + self.mdmS[symbol]
if s != 0:
pdi = self.pdmS[symbol] / s
mdi = self.mdmS[symbol] / s
else:
pdi = 0
mdi = 0
# Smooth directional indicators
self.pdiS[symbol] = (1 - k) * self.pdiS[symbol] + k * pdi
self.mdiS[symbol] = (1 - k) * self.mdiS[symbol] + k * mdi
# Calculate directional index
d = abs(self.pdiS[symbol] - self.mdiS[symbol])
s1 = self.pdiS[symbol] + self.mdiS[symbol]
if s1 != 0:
self.iS[symbol] = (1 - k) * self.iS[symbol] + k * d / s1
else:
self.iS[symbol] = (1 - k) * self.iS[symbol]
# Calculate volatility index
if self.vma_values[symbol].IsReady:
iS_values = [self.iS[symbol]]
for i in range(self.vma_length - 1):
# This is a simplification since we don't have iS history
# In real implementation, you should keep track of iS values
iS_values.append(self.iS[symbol] * 0.9**(i+1))
hhv = max(iS_values)
llv = min(iS_values)
d1 = hhv - llv
if d1 != 0:
vI = (self.iS[symbol] - llv) / d1
else:
vI = 0
# Calculate VMA
self.vma[symbol] = (1 - k * vI) * self.vma[symbol] + k * vI * src
else:
self.vma[symbol] = src
# Add to VMA rolling window
self.vma_values[symbol].Add(self.vma[symbol])
def DetectPivots(self, symbol):
"""Detect pivot highs and lows"""
# Need enough data points to find pivots
if not (self.high_window[symbol].IsReady and self.low_window[symbol].IsReady):
return None, None
# Check for pivot high
swh = None
is_pivot_high = True
pivot_idx = self.rightBars
pivot_high = self.high_window[symbol][pivot_idx]
for i in range(1, self.leftBars + 1):
if pivot_idx + i < self.high_window[symbol].Count and self.high_window[symbol][pivot_idx + i] >= pivot_high:
is_pivot_high = False
break
for i in range(1, self.rightBars + 1):
if pivot_idx - i >= 0 and self.high_window[symbol][pivot_idx - i] > pivot_high:
is_pivot_high = False
break
if is_pivot_high:
swh = pivot_high
# Check for pivot low
swl = None
is_pivot_low = True
pivot_low = self.low_window[symbol][pivot_idx]
for i in range(1, self.leftBars + 1):
if pivot_idx + i < self.low_window[symbol].Count and self.low_window[symbol][pivot_idx + i] <= pivot_low:
is_pivot_low = False
break
for i in range(1, self.rightBars + 1):
if pivot_idx - i >= 0 and self.low_window[symbol][pivot_idx - i] < pivot_low:
is_pivot_low = False
break
if is_pivot_low:
swl = pivot_low
return swh, swl
def UpdateTradeSignals(self, symbol, swh, swl, current_close):
"""Update trade signals based on pivot points and VMA"""
if not self.vma_values[symbol].IsReady:
return
security = self.Securities[symbol]
has_valid_data = (security.HasData and
security.Price > 0 and
security.High > 0 and
security.Low > 0)
if not has_valid_data:
return
# Update hprice and lprice if new pivot is detected
if swh is not None:
self.hprice[symbol] = swh
self.le[symbol] = current_close > self.vma[symbol]
elif self.le[symbol] and self.Securities[symbol].High > self.hprice[symbol]:
self.le[symbol] = False
if swl is not None:
self.lprice[symbol] = swl
self.se[symbol] = current_close < self.vma[symbol]
elif self.se[symbol] and self.Securities[symbol].Low < self.lprice[symbol]:
self.se[symbol] = False
# Check for flat conditions
allow_flats_long = (self.vma[symbol] > self.vma_values[symbol][1])
allow_flats_short = (self.vma[symbol] < self.vma_values[symbol][1])
if self.tof:
allow_flats_long = (self.vma[symbol] >= self.vma_values[symbol][1])
allow_flats_short = (self.vma[symbol] <= self.vma_values[symbol][1])
# Update trade signals
self.can_buy[symbol] = self.le[symbol] and allow_flats_long and self.longs_checked
self.can_sell[symbol] = self.se[symbol] and allow_flats_short and self.shorts_checked
self.exit_buys[symbol] = current_close < self.vma[symbol]
self.exit_sells[symbol] = current_close > self.vma[symbol]
def ExecuteTrades(self, symbol, bar):
"""Execute trades based on signals"""
# Skip if we don't have an active contract
if symbol != self.active_symbol:
return
if self.IsWarmingUp:
return
# Check for exit conditions first
if self.exit_buys[symbol] and self.Portfolio[symbol].IsLong:
self.Liquidate(symbol, "Close Longs")
self.Debug(f"Close Long at {bar.Close}")
if self.exit_sells[symbol] and self.Portfolio[symbol].IsShort:
self.Liquidate(symbol, "Close Shorts")
self.Debug(f"Close Short at {bar.Close}")
# Check for entry conditions
if self.can_buy[symbol] and not self.Portfolio[symbol].IsLong:
self.MarketOrder(symbol, 1)
self.Debug(f"Buy Long at {bar.Close}, hprice: {self.hprice[symbol]}")
if self.can_sell[symbol] and not self.Portfolio[symbol].IsShort:
self.MarketOrder(symbol, -1)
self.Debug(f"Sell Short at {bar.Close}, lprice: {self.lprice[symbol]}")
def OnSecuritiesChanged(self, changes):
"""Handle changes in securities (like when a new futures contract is selected)"""
self.Debug(f"OnSecuritiesChanged called with {len(changes.AddedSecurities)} added, {len(changes.RemovedSecurities)} removed")
# Handle removed securities (expired contracts)
for security in changes.RemovedSecurities:
symbol = security.Symbol
self.Debug(f"Removing security: {symbol}")
# Remove consolidator
if symbol in self.consolidators:
self.SubscriptionManager.RemoveConsolidator(symbol, self.consolidators[symbol])
self.consolidators.pop(symbol, None)
# Clean up data structures
if symbol in self.prices:
self.prices.pop(symbol, None)
self.vma_values.pop(symbol, None)
self.high_window.pop(symbol, None)
self.low_window.pop(symbol, None)
self.close_window.pop(symbol, None)
# Clean up state variables
self.pdmS.pop(symbol, None)
self.mdmS.pop(symbol, None)
self.pdiS.pop(symbol, None)
self.mdiS.pop(symbol, None)
self.iS.pop(symbol, None)
self.vma.pop(symbol, None)
self.hprice.pop(symbol, None)
self.lprice.pop(symbol, None)
self.le.pop(symbol, None)
self.se.pop(symbol, None)
self.can_buy.pop(symbol, None)
self.can_sell.pop(symbol, None)
self.exit_buys.pop(symbol, None)
self.exit_sells.pop(symbol, None)
# Handle added securities (new contracts)
for security in changes.AddedSecurities:
# Only handle futures contracts
if security.Symbol.SecurityType != SecurityType.Future:
continue
symbol = security.Symbol
self.Debug(f"Adding security: {symbol}")
# Check if this symbol should be our active symbol
# Either it's the mapped symbol or we don't have an active symbol yet
if symbol == self._future.Mapped or self.active_symbol is None:
self.active_symbol = symbol
self.Log(f"Set active symbol to {self.active_symbol}")
# Create consolidator for this symbol (it's now properly subscribed)
try:
self.CreateConsolidator(symbol)
if symbol == self.active_symbol:
self.Log(f"Created consolidator for active symbol {symbol}")
self.MarketOrder(symbol, self.quantity, tag="contract changeover")
else:
self.Debug(f"Created consolidator for symbol {symbol}")
except Exception as e:
self.Debug(f"Error creating consolidator for {symbol}: {str(e)}")
# Continue without this consolidator
def OnSymbolChangedEvents(self, symbolChangedEvents):
"""Handle symbol changes (rollover between contracts)"""
for symbol, changedEvent in symbolChangedEvents.items():
old_symbol = changedEvent.OldSymbol
new_symbol = changedEvent.NewSymbol
quantity = self.Portfolio[old_symbol].Quantity
self.Debug(f"Symbol rollover: {old_symbol} -> {new_symbol}")
# Remove old consolidator
if old_symbol in self.consolidators:
self.SubscriptionManager.RemoveConsolidator(old_symbol, self.consolidators[old_symbol])
self.consolidators.pop(old_symbol, None)
# Rollover: liquidate any position of the old mapped contract and switch to the newly mapped contract
tag = f"Rollover - Symbol changed at {self.Time}: {old_symbol} -> {new_symbol}"
self.Liquidate(old_symbol, tag=tag)
if quantity != 0:
self.quantity = quantity
# Update the active symbol but don't create consolidator yet
# The consolidator will be created in OnSecuritiesChanged when the new symbol is properly added
self.active_symbol = new_symbol
self.Log(f"Symbol changed: {old_symbol} -> {new_symbol}. Consolidator will be created when security is added.")
def ExitPositions(self):
"""Function to close all positions at the end of the day"""
#if self.active_symbol and self.Portfolio[self.active_symbol].Invested:
# self.Liquidate(self.active_symbol, "End of day liquidation")# Variable MA Breakout Strategy
# Adapted from PineScript strategy "TImeframed Variable MA Breakout"
# Original by shua12
# Modified to use 10-minute consolidator
from AlgorithmImports import *
import numpy as np
from datetime import time, timedelta
class VariableMABreakout(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2020, 1, 1) # Set start date
self.SetCash(100000) # Set starting cash
# Strategy parameters (matching the original)
self.vma_length = 6
self.start_time = 7
self.end_time = 12
self.leftBars = 4
self.rightBars = 1
self.tof = False # Can trade on flats
self.longs_checked = True # Trade Longs
self.shorts_checked = False
self.quantity = 0 # Trade Shorts
# Set up futures contract based on the correct documentation approach
self.UniverseSettings.Resolution = Resolution.MINUTE
self.UniverseSettings.Asynchronous = True
# Set up the SP Futures
self._future = self.AddFuture(Futures.Indices.SP_500_E_MINI,
extendedMarketHours=False, # Only use regular market hours
dataMappingMode=DataMappingMode.LastTradingDay,
dataNormalizationMode=DataNormalizationMode.BackwardsRatio,
contractDepthOffset=0)
# Set the filter to use front month contracts
self._future.SetFilter(0, 60) # Look at contracts up to 60 days out
# Initialize consolidator dictionary for 10-minute bars
self.consolidators = {}
# Initialize data structures for Variable MA calculation
self.prices = {}
self.vma_values = {}
self.high_window = {}
self.low_window = {}
self.close_window = {}
# Initialize state dictionaries keyed by symbol
self.pdmS = {}
self.mdmS = {}
self.pdiS = {}
self.mdiS = {}
self.iS = {}
self.vma = {}
self.hprice = {}
self.lprice = {}
self.le = {}
self.se = {}
self.can_buy = {}
self.can_sell = {}
self.exit_buys = {}
self.exit_sells = {}
# Track the active symbol
self.active_symbol = None
# Set the time zone
self.SetTimeZone(TimeZones.NewYork)
# Debug counter
self.debug_counter = 0
def OnData(self, data):
"""Process data - log data reception for debugging"""
if self.debug_counter % 100 == 0: # Log every 100 data points
self.Debug(f"OnData called at {self.Time}, Active symbol: {self.active_symbol}")
if self.active_symbol and self.active_symbol in data:
self.Debug(f"Data received for {self.active_symbol}: {data[self.active_symbol].Close}")
self.debug_counter += 1
def CreateConsolidator(self, symbol):
"""Create and register a 10-minute consolidator for the given symbol"""
# Create 10-minute consolidator
consolidator = TradeBarConsolidator(timedelta(minutes=10))
# Register the consolidator update handler
consolidator.DataConsolidated += self.OnTenMinuteBar
# Subscribe the consolidator to updates for this symbol
self.SubscriptionManager.AddConsolidator(symbol, consolidator)
# Store reference to consolidator
self.consolidators[symbol] = consolidator
self.Debug(f"Created 10-minute consolidator for {symbol}")
return consolidator
def OnTenMinuteBar(self, sender, bar):
"""Event handler for 10-minute consolidated bars"""
symbol = bar.Symbol
self.Debug(f"10-minute bar received for {symbol} at {self.Time}: O:{bar.Open} H:{bar.High} L:{bar.Low} C:{bar.Close}")
# Only process if this is our active symbol
if symbol != self.active_symbol:
self.Debug(f"Skipping bar for {symbol} - not active symbol (active: {self.active_symbol})")
return
self.ProcessBar(symbol, bar)
def ProcessBar(self, symbol, bar):
"""Process a new bar for the given symbol"""
# Check if we're within trading hours
current_hour = self.Time.hour
if not (current_hour >= self.start_time and current_hour <= self.end_time):
self.Debug(f"Outside trading hours ({current_hour}), skipping bar processing")
return
self.Debug(f"Processing bar for {symbol} at {self.Time}")
# Initialize data structures for this symbol if needed
self.EnsureDataStructures(symbol)
# Add latest price data
self.prices[symbol].Add(bar.Close)
self.high_window[symbol].Add(bar.High)
self.low_window[symbol].Add(bar.Low)
self.close_window[symbol].Add(bar.Close)
# Calculate VMA
self.CalculateVMA(symbol, bar.Close)
# Check for pivot points
swh, swl = self.DetectPivots(symbol)
if swh is not None:
self.Debug(f"Pivot high detected: {swh}")
if swl is not None:
self.Debug(f"Pivot low detected: {swl}")
# Update trade signals
self.UpdateTradeSignals(symbol, swh, swl, bar.Close)
# Log current state
if symbol in self.can_buy and symbol in self.can_sell:
self.Debug(f"Trade signals - Can buy: {self.can_buy[symbol]}, Can sell: {self.can_sell[symbol]}")
self.Debug(f"VMA: {self.vma[symbol]:.2f}, Close: {bar.Close:.2f}")
# Execute trade logic
self.ExecuteTrades(symbol, bar)
def EnsureDataStructures(self, symbol):
"""Initialize data structures for a symbol if they don't already exist"""
if symbol not in self.prices:
self.Debug(f"Initializing data structures for {symbol}")
# Create rolling windows
self.prices[symbol] = RollingWindow[float](self.vma_length + 1)
self.vma_values[symbol] = RollingWindow[float](self.vma_length + 1)
self.high_window[symbol] = RollingWindow[float](self.leftBars + self.rightBars + 1)
self.low_window[symbol] = RollingWindow[float](self.leftBars + self.rightBars + 1)
self.close_window[symbol] = RollingWindow[float](self.leftBars + self.rightBars + 1)
# Initialize state variables
self.pdmS[symbol] = 0.0
self.mdmS[symbol] = 0.0
self.pdiS[symbol] = 0.0
self.mdiS[symbol] = 0.0
self.iS[symbol] = 0.0
self.vma[symbol] = 0.0
self.hprice[symbol] = 0.0
self.lprice[symbol] = 0.0
self.le[symbol] = False
self.se[symbol] = False
self.can_buy[symbol] = False
self.can_sell[symbol] = False
self.exit_buys[symbol] = False
self.exit_sells[symbol] = False
# Warm up with historical data (using 10-minute bars)
try:
history = self.History(symbol, self.vma_length + self.leftBars + self.rightBars + 1, Resolution.MINUTE)
# Create 10-minute bars from 1-minute history data for warmup
if not history.empty:
# Get consolidated 10-minute bars
history_10min = self.ConsolidateHistoryTo10Min(history, symbol)
# Use the consolidated 10-minute bars for warmup
for bar_data in history_10min:
self.high_window[symbol].Add(float(bar_data['high']))
self.low_window[symbol].Add(float(bar_data['low']))
self.close_window[symbol].Add(float(bar_data['close']))
self.prices[symbol].Add(float(bar_data['close']))
self.Debug(f"Warmed up {symbol} with {len(history_10min)} historical 10-minute bars")
else:
self.Debug(f"No historical data available for {symbol}")
except Exception as e:
self.Debug(f"Error warming up data for {symbol}: {str(e)}")
# Continue without warmup data
def ConsolidateHistoryTo10Min(self, history, symbol):
"""Convert 1-minute history data to 10-minute bars for warmup"""
consolidated_bars = []
if history.empty:
return consolidated_bars
# Filter for the specific symbol - handle MultiIndex properly
try:
# Check if we have a MultiIndex
if isinstance(history.index, pd.MultiIndex):
# Get the symbol level (usually level 1)
symbol_data = history.xs(symbol, level=1)
else:
# Single index, filter by symbol column if it exists
if 'symbol' in history.columns:
symbol_data = history[history['symbol'] == symbol]
else:
symbol_data = history
except (KeyError, IndexError):
# If symbol not found or indexing fails, return empty
self.Debug(f"Could not find symbol {symbol} in history data")
return consolidated_bars
if symbol_data.empty:
return consolidated_bars
# Ensure the index is a proper DatetimeIndex
if not isinstance(symbol_data.index, pd.DatetimeIndex):
self.Debug("History index is not DatetimeIndex, cannot consolidate")
return consolidated_bars
try:
# Group by 10-minute intervals
grouped = symbol_data.resample('10T')
for timestamp, group in grouped:
if not group.empty and len(group) > 0:
bar_data = {
'timestamp': timestamp,
'open': group['open'].iloc[0],
'high': group['high'].max(),
'low': group['low'].min(),
'close': group['close'].iloc[-1],
'volume': group['volume'].sum() if 'volume' in group.columns else 0
}
consolidated_bars.append(bar_data)
except Exception as e:
self.Debug(f"Error consolidating history data: {str(e)}")
# Fallback: just use the raw data points as-is
for i in range(min(len(symbol_data), 20)): # Limit to prevent too much data
row = symbol_data.iloc[i]
bar_data = {
'timestamp': symbol_data.index[i],
'open': row['open'],
'high': row['high'],
'low': row['low'],
'close': row['close'],
'volume': row['volume'] if 'volume' in row else 0
}
consolidated_bars.append(bar_data)
return consolidated_bars
def CalculateVMA(self, symbol, src):
"""Calculate Variable Moving Average"""
if self.prices[symbol].IsReady:
k = 1.0 / self.vma_length
# Calculate PDM and MDM
pdm = max(src - self.prices[symbol][1], 0)
mdm = max(self.prices[symbol][1] - src, 0)
# Update smoothed PDM and MDM
self.pdmS[symbol] = (1 - k) * self.pdmS[symbol] + k * pdm
self.mdmS[symbol] = (1 - k) * self.mdmS[symbol] + k * mdm
# Calculate directional indicators
s = self.pdmS[symbol] + self.mdmS[symbol]
if s != 0:
pdi = self.pdmS[symbol] / s
mdi = self.mdmS[symbol] / s
else:
pdi = 0
mdi = 0
# Smooth directional indicators
self.pdiS[symbol] = (1 - k) * self.pdiS[symbol] + k * pdi
self.mdiS[symbol] = (1 - k) * self.mdiS[symbol] + k * mdi
# Calculate directional index
d = abs(self.pdiS[symbol] - self.mdiS[symbol])
s1 = self.pdiS[symbol] + self.mdiS[symbol]
if s1 != 0:
self.iS[symbol] = (1 - k) * self.iS[symbol] + k * d / s1
else:
self.iS[symbol] = (1 - k) * self.iS[symbol]
# Calculate volatility index
if self.vma_values[symbol].IsReady:
iS_values = [self.iS[symbol]]
for i in range(self.vma_length - 1):
# This is a simplification since we don't have iS history
# In real implementation, you should keep track of iS values
iS_values.append(self.iS[symbol] * 0.9**(i+1))
hhv = max(iS_values)
llv = min(iS_values)
d1 = hhv - llv
if d1 != 0:
vI = (self.iS[symbol] - llv) / d1
else:
vI = 0
# Calculate VMA
self.vma[symbol] = (1 - k * vI) * self.vma[symbol] + k * vI * src
else:
self.vma[symbol] = src
# Add to VMA rolling window
self.vma_values[symbol].Add(self.vma[symbol])
def DetectPivots(self, symbol):
"""Detect pivot highs and lows"""
# Need enough data points to find pivots
if not (self.high_window[symbol].IsReady and self.low_window[symbol].IsReady):
return None, None
# Check for pivot high
swh = None
is_pivot_high = True
pivot_idx = self.rightBars
pivot_high = self.high_window[symbol][pivot_idx]
for i in range(1, self.leftBars + 1):
if pivot_idx + i < self.high_window[symbol].Count and self.high_window[symbol][pivot_idx + i] >= pivot_high:
is_pivot_high = False
break
for i in range(1, self.rightBars + 1):
if pivot_idx - i >= 0 and self.high_window[symbol][pivot_idx - i] > pivot_high:
is_pivot_high = False
break
if is_pivot_high:
swh = pivot_high
# Check for pivot low
swl = None
is_pivot_low = True
pivot_low = self.low_window[symbol][pivot_idx]
for i in range(1, self.leftBars + 1):
if pivot_idx + i < self.low_window[symbol].Count and self.low_window[symbol][pivot_idx + i] <= pivot_low:
is_pivot_low = False
break
for i in range(1, self.rightBars + 1):
if pivot_idx - i >= 0 and self.low_window[symbol][pivot_idx - i] < pivot_low:
is_pivot_low = False
break
if is_pivot_low:
swl = pivot_low
return swh, swl
def UpdateTradeSignals(self, symbol, swh, swl, current_close):
"""Update trade signals based on pivot points and VMA"""
if not self.vma_values[symbol].IsReady:
return
security = self.Securities[symbol]
has_valid_data = (security.HasData and
security.Price > 0 and
security.High > 0 and
security.Low > 0)
if not has_valid_data:
return
# Update hprice and lprice if new pivot is detected
if swh is not None:
self.hprice[symbol] = swh
self.le[symbol] = current_close > self.vma[symbol]
elif self.le[symbol] and self.Securities[symbol].High > self.hprice[symbol]:
self.le[symbol] = False
if swl is not None:
self.lprice[symbol] = swl
self.se[symbol] = current_close < self.vma[symbol]
elif self.se[symbol] and self.Securities[symbol].Low < self.lprice[symbol]:
self.se[symbol] = False
# Check for flat conditions
allow_flats_long = (self.vma[symbol] > self.vma_values[symbol][1])
allow_flats_short = (self.vma[symbol] < self.vma_values[symbol][1])
if self.tof:
allow_flats_long = (self.vma[symbol] >= self.vma_values[symbol][1])
allow_flats_short = (self.vma[symbol] <= self.vma_values[symbol][1])
# Update trade signals
self.can_buy[symbol] = self.le[symbol] and allow_flats_long and self.longs_checked
self.can_sell[symbol] = self.se[symbol] and allow_flats_short and self.shorts_checked
self.exit_buys[symbol] = current_close < self.vma[symbol]
self.exit_sells[symbol] = current_close > self.vma[symbol]
def ExecuteTrades(self, symbol, bar):
"""Execute trades based on signals"""
# Skip if we don't have an active contract
if symbol != self.active_symbol:
return
if self.IsWarmingUp:
return
# Check for exit conditions first
if self.exit_buys[symbol] and self.Portfolio[symbol].IsLong:
self.Liquidate(symbol, "Close Longs")
self.Debug(f"Close Long at {bar.Close}")
if self.exit_sells[symbol] and self.Portfolio[symbol].IsShort:
self.Liquidate(symbol, "Close Shorts")
self.Debug(f"Close Short at {bar.Close}")
# Check for entry conditions
if self.can_buy[symbol] and not self.Portfolio[symbol].IsLong:
self.MarketOrder(symbol, 3)
self.Debug(f"Buy Long at {bar.Close}, hprice: {self.hprice[symbol]}")
if self.can_sell[symbol] and not self.Portfolio[symbol].IsShort:
self.MarketOrder(symbol, -3)
self.Debug(f"Sell Short at {bar.Close}, lprice: {self.lprice[symbol]}")
def OnSecuritiesChanged(self, changes):
"""Handle changes in securities (like when a new futures contract is selected)"""
self.Debug(f"OnSecuritiesChanged called with {len(changes.AddedSecurities)} added, {len(changes.RemovedSecurities)} removed")
# Handle removed securities (expired contracts)
for security in changes.RemovedSecurities:
symbol = security.Symbol
self.Debug(f"Removing security: {symbol}")
# Remove consolidator
if symbol in self.consolidators:
self.SubscriptionManager.RemoveConsolidator(symbol, self.consolidators[symbol])
self.consolidators.pop(symbol, None)
# Clean up data structures
if symbol in self.prices:
self.prices.pop(symbol, None)
self.vma_values.pop(symbol, None)
self.high_window.pop(symbol, None)
self.low_window.pop(symbol, None)
self.close_window.pop(symbol, None)
# Clean up state variables
self.pdmS.pop(symbol, None)
self.mdmS.pop(symbol, None)
self.pdiS.pop(symbol, None)
self.mdiS.pop(symbol, None)
self.iS.pop(symbol, None)
self.vma.pop(symbol, None)
self.hprice.pop(symbol, None)
self.lprice.pop(symbol, None)
self.le.pop(symbol, None)
self.se.pop(symbol, None)
self.can_buy.pop(symbol, None)
self.can_sell.pop(symbol, None)
self.exit_buys.pop(symbol, None)
self.exit_sells.pop(symbol, None)
# Handle added securities (new contracts)
for security in changes.AddedSecurities:
# Only handle futures contracts
if security.Symbol.SecurityType != SecurityType.Future:
continue
symbol = security.Symbol
self.Debug(f"Adding security: {symbol}")
# Check if this symbol should be our active symbol
# Either it's the mapped symbol or we don't have an active symbol yet
if symbol == self._future.Mapped or self.active_symbol is None:
self.active_symbol = symbol
self.Log(f"Set active symbol to {self.active_symbol}")
# Create consolidator for this symbol (it's now properly subscribed)
try:
self.CreateConsolidator(symbol)
if symbol == self.active_symbol:
self.Log(f"Created consolidator for active symbol {symbol}")
self.MarketOrder(symbol, self.quantity, tag="contract changeover")
else:
self.Debug(f"Created consolidator for symbol {symbol}")
except Exception as e:
self.Debug(f"Error creating consolidator for {symbol}: {str(e)}")
# Continue without this consolidator
def OnSymbolChangedEvents(self, symbolChangedEvents):
"""Handle symbol changes (rollover between contracts)"""
for symbol, changedEvent in symbolChangedEvents.items():
old_symbol = changedEvent.OldSymbol
new_symbol = changedEvent.NewSymbol
quantity = self.Portfolio[old_symbol].Quantity
self.Debug(f"Symbol rollover: {old_symbol} -> {new_symbol}")
# Remove old consolidator
if old_symbol in self.consolidators:
self.SubscriptionManager.RemoveConsolidator(old_symbol, self.consolidators[old_symbol])
self.consolidators.pop(old_symbol, None)
# Rollover: liquidate any position of the old mapped contract and switch to the newly mapped contract
tag = f"Rollover - Symbol changed at {self.Time}: {old_symbol} -> {new_symbol}"
self.Liquidate(old_symbol, tag=tag)
if quantity != 0:
self.quantity = quantity
# Update the active symbol but don't create consolidator yet
# The consolidator will be created in OnSecuritiesChanged when the new symbol is properly added
self.active_symbol = new_symbol
self.Log(f"Symbol changed: {old_symbol} -> {new_symbol}. Consolidator will be created when security is added.")
def ExitPositions(self):
"""Function to close all positions at the end of the day"""
#if self.active_symbol and self.Portfolio[self.active_symbol].Invested:
# self.Liquidate(self.active_symbol, "End of day liquidation")