| Overall Statistics |
|
Total Orders 78 Average Win 44.03% Average Loss -11.66% Compounding Annual Return 78.756% Drawdown 69.900% Expectancy 2.062 Start Equity 10000 End Equity 325048.66 Net Profit 3150.487% Sharpe Ratio 1.301 Sortino Ratio 1.187 Probabilistic Sharpe Ratio 52.791% Loss Rate 36% Win Rate 64% Profit-Loss Ratio 3.78 Alpha 0.576 Beta 1.007 Annual Standard Deviation 0.524 Annual Variance 0.275 Information Ratio 1.159 Tracking Error 0.497 Treynor Ratio 0.677 Total Fees $6185.87 Estimated Strategy Capacity $320000000.00 Lowest Capacity Asset MSTR RBGP9S2961YD Portfolio Turnover 3.59% |
from AlgorithmImports import *
from sklearn.cluster import KMeans # K Means clustering model
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import StandardScaler
import numpy as np
import pandas as pd
# Custom fee model for 0.1% per trade
class PercentageFeeModel(FeeModel):
def GetOrderFee(self, parameters):
security = parameters.Security
order = parameters.Order
fee = 0.001 * security.Price * abs(order.Quantity)
currency = security.QuoteCurrency.Symbol
return OrderFee(CashAmount(fee, currency))
class MLTradingAlgorithm(QCAlgorithm):
def Initialize(self):
# Algorithm Parameters
self.SetStartDate(2019, 1, 1) # Start date
self.SetEndDate(2024, 12, 31) # End date
self.SetCash(10000) # Initial capital
# Configurable ticker symbols and allocation percentage
self.trading_ticker = self.GetParameter("trading_ticker", "MSTR")
self.benchmark_ticker = self.GetParameter("benchmark_ticker", "SPY")
self.allocation_percentage = self.GetParameter("allocation_percentage", 1)
# Add trading equity with custom fee and slippage models
trading_security = self.AddEquity(self.trading_ticker, Resolution.Daily)
trading_security.SetFeeModel(PercentageFeeModel())
trading_security.SetSlippageModel(ConstantSlippageModel(0))
self.symbol = trading_security.Symbol
# Add benchmark equity with custom fee and slippage models
benchmark_security = self.AddEquity(self.benchmark_ticker, Resolution.Daily)
benchmark_security.SetFeeModel(PercentageFeeModel())
benchmark_security.SetSlippageModel(ConstantSlippageModel(0))
self.benchmark_symbol = benchmark_security.Symbol
# RollingWindow to store 200 days of TradeBar data for the trading asset
self.data = RollingWindow[TradeBar](200)
# Warm-up period
self.SetWarmUp(200)
# Initialize KMeans clustering model with 2 clusters
self.model = KMeans(n_clusters=2, random_state=42)
# Scaler for feature normalization
self.scaler = StandardScaler()
# Dictionary mapping each cluster to a trading signal (1: buy, 0: sell)
self.cluster_to_signal = {}
self.training_count = 0
self.is_model_trained = False # Tracks if the model is trained
# Schedule training every Monday at 10:00 AM
self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday),
self.TimeRules.At(10, 0),
self.TrainModel)
# Initialize variables for benchmarking
self.beat_benchmark_count = 0
self.trade_entry_price = None # To store the entry price of a trade
self.benchmark_entry_price = None # To store the benchmark price at trade entry
def OnData(self, data):
# Ensure data exists for trading symbol
if not data.ContainsKey(self.symbol):
return
trade_bar = data[self.symbol]
if trade_bar is None:
return
# Add TradeBar to Rolling Window
self.data.Add(trade_bar)
# Check if RollingWindow is ready
if not self.data.IsReady or self.data.Count < 200:
return
# Ensure model is trained before making predictions
if not self.is_model_trained:
self.Debug("Model is not trained yet. Skipping prediction.")
return
# Extract features for prediction
df = self.GetFeatureDataFrame()
if df is None or len(df) < 1:
return
# Latest features (all columns except the target)
latest_features = df.iloc[-1, :-1].values.reshape(1, -1)
try:
# Scale the features using the fitted scaler
latest_features_scaled = self.scaler.transform(latest_features)
# Get the cluster label from KMeans
cluster_label = self.model.predict(latest_features_scaled)[0]
# Map the cluster label to a trading signal (1: buy, 0: sell)
prediction = self.cluster_to_signal.get(cluster_label, 0)
except Exception as e:
self.Debug(f"Error: Model prediction failed. {e}")
return
# Trading logic
holdings = self.Portfolio[self.symbol].Quantity
# Buy if prediction = 1 and not currently invested
if prediction == 1 and holdings <= 0:
self.SetHoldings(self.symbol, self.allocation_percentage)
# Record the entry prices for the trade and benchmark
self.trade_entry_price = trade_bar.Close
if self.benchmark_symbol in data and data[self.benchmark_symbol] is not None:
self.benchmark_entry_price = data[self.benchmark_symbol].Close
else:
self.benchmark_entry_price = None
# Sell if prediction = 0 and currently invested
elif prediction == 0 and holdings > 0:
# Calculate trade return and benchmark return
if self.trade_entry_price is not None and self.benchmark_entry_price is not None:
trade_exit_price = trade_bar.Close
trade_return = (trade_exit_price - self.trade_entry_price) / self.trade_entry_price
if self.benchmark_symbol in data and data[self.benchmark_symbol] is not None:
benchmark_exit_price = data[self.benchmark_symbol].Close
benchmark_return = (benchmark_exit_price - self.benchmark_entry_price) / self.benchmark_entry_price
# Compare trade return with benchmark return
if trade_return > benchmark_return:
self.beat_benchmark_count += 1
# Reset entry prices after the trade is closed
self.trade_entry_price = None
self.benchmark_entry_price = None
# Execute the sell order
self.Liquidate(self.symbol)
def TrainModel(self):
# Prepare training data
df = self.GetFeatureDataFrame()
if df is None or len(df) < 50: # Require enough data to train
self.Debug("Insufficient data for training.")
return
# Separate features and target; features are all columns except 'Target'
X = df.iloc[:, :-1] # Features
y = df.iloc[:, -1] # Target (0 or 1)
# Scale the features
X_scaled = self.scaler.fit_transform(X)
# Split data chronologically (no shuffle) for evaluation purposes
X_train, X_test, y_train, y_test = train_test_split(
X_scaled, y, test_size=0.2, shuffle=False, random_state=42
)
# Train KMeans clustering on the training data
self.model.fit(X_train)
# Determine the trading signal for each cluster:
# For each cluster, calculate the average target value of samples assigned to it.
# If the mean > 0.5, we assign a signal 1 (buy); otherwise, signal 0 (sell).
train_labels = self.model.labels_
cluster_to_signal = {}
for cluster in np.unique(train_labels):
indices = np.where(train_labels == cluster)
cluster_mean = np.mean(np.array(y_train)[indices])
cluster_to_signal[cluster] = 1 if cluster_mean > 0.5 else 0
self.cluster_to_signal = cluster_to_signal
# Evaluate model performance on the training set
train_cluster_labels = self.model.predict(X_train)
train_signals = np.array([self.cluster_to_signal[label] for label in train_cluster_labels])
train_accuracy = accuracy_score(y_train, train_signals)
# Evaluate on the test set
test_cluster_labels = self.model.predict(X_test)
test_signals = np.array([self.cluster_to_signal[label] for label in test_cluster_labels])
test_accuracy = accuracy_score(y_test, test_signals)
self.is_model_trained = True
self.training_count += 1
self.Debug(f"Training #{self.training_count}: "
f"Train Accuracy: {train_accuracy:.2%}, "
f"Test Accuracy: {test_accuracy:.2%}")
def GetFeatureDataFrame(self):
# Wait until we have 200 data points in the rolling window
if self.data.Count < 200:
return None
# Convert rolling window data to a DataFrame
close_prices = [bar.Close for bar in self.data]
df = pd.DataFrame(close_prices, columns=["Close"])
# Feature Engineering
df["SMA_10"] = df["Close"].rolling(window=10).mean()
df["SMA_60"] = df["Close"].rolling(window=60).mean()
# RSI Calculation
delta = df["Close"].diff()
gain = (delta.where(delta > 0, 0)).rolling(14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(14).mean()
rs = gain / loss
df["RSI"] = 100 - (100 / (1 + rs))
# MACD Calculation
df["MACD"] = df["Close"].ewm(span=12, adjust=False).mean() - df["Close"].ewm(span=26, adjust=False).mean()
df["MACD_Signal"] = df["MACD"].ewm(span=9, adjust=False).mean()
# Historical Volatility (HV_30)
df["HV_30"] = df["Close"].pct_change().rolling(window=30).std() * np.sqrt(252)
# Define Target: 1 if next day's Close > today's Close, else 0
df["Target"] = (df["Close"].shift(-1) > df["Close"]).astype(int)
# Remove rows with NaN values
df.dropna(inplace=True)
return df
def OnEndOfAlgorithm(self):
# Print the number of times the strategy beat the benchmark
self.Log(f"Number of times strategy beat {self.benchmark_ticker}: {self.beat_benchmark_count}")