| Overall Statistics |
|
Total Orders 586 Average Win 0.86% Average Loss -1.21% Compounding Annual Return 1.027% Drawdown 35.400% Expectancy 0.028 Start Equity 10000 End Equity 10628.96 Net Profit 6.290% Sharpe Ratio -0.152 Sortino Ratio -0.148 Probabilistic Sharpe Ratio 0.493% Loss Rate 40% Win Rate 60% Profit-Loss Ratio 0.71 Alpha -0.016 Beta 0.006 Annual Standard Deviation 0.098 Annual Variance 0.01 Information Ratio -0.621 Tracking Error 0.191 Treynor Ratio -2.317 Total Fees $577.00 Estimated Strategy Capacity $750000000.00 Lowest Capacity Asset MSTR RBGP9S2961YD Portfolio Turnover 5.12% |
from AlgorithmImports import *
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import numpy as np
import pandas as pd
class MLTradingAlgorithm(QCAlgorithm):
def Initialize(self):
# 1. Setup Algorithm Parameters
self.SetStartDate(2019, 1, 1)
self.SetEndDate(2024, 12, 31)
self.SetCash(10000) # Enough capital for partial allocation
# 2. Partial allocation (e.g., 20%)
self.allocation = 0.2
# 3. Add Equity
self.symbol = self.AddEquity("MSTR", Resolution.Daily).Symbol
# 4. Rolling Window for 200 Days of TradeBar data
self.data = RollingWindow[TradeBar](200)
self.SetWarmUp(200)
# 5. ML Model (Random Forest)
self.model = RandomForestClassifier(n_estimators=100, random_state=42)
self.is_model_trained = False
self.training_count = 0
# 6. Schedule Weekly Retraining
self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday),
self.TimeRules.At(10, 0),
self.TrainModel)
# 7. Trailing Stop Logic
self.highestPrice = 0
self.trailStopTicket = None
self.trailing_stop_pct = 1 # 5% trailing stop below highest price
# 8. Configurable usage of RSI or SMA
# Set True to use RSI, False to use short/long SMAs
self.useRSI = True
def OnData(self, data):
# Check for valid data
if not data.ContainsKey(self.symbol):
return
trade_bar = data[self.symbol]
if trade_bar is None:
return
# Update rolling window
self.data.Add(trade_bar)
if not self.data.IsReady or self.data.Count < 200:
return
# Skip if model not trained
if not self.is_model_trained:
return
# Build features for the latest bar
df = self.GetFeatureDataFrame()
if df is None or len(df) == 0:
return
latest_features = df.iloc[-1, :-1].values.reshape(1, -1)
try:
prediction = self.model.predict(latest_features)[0] # 1 = Buy, 0 = Sell
except:
return
holdings = self.Portfolio[self.symbol].Quantity
# -----------------------------
# Trading Logic
# -----------------------------
if prediction == 1 and holdings <= 0:
# If we are short, first liquidate
if holdings < 0:
self.Liquidate(self.symbol)
self.ResetTrailingStop()
# Go long with partial allocation
self.SetHoldings(self.symbol, self.allocation)
# Reset trailing stop for new position
self.highestPrice = trade_bar.Close
self.PlaceOrUpdateTrailingStop()
elif prediction == 0 and holdings > 0:
# Liquidate if we hold a long position
self.Liquidate(self.symbol)
self.ResetTrailingStop()
# Update trailing stop if we're holding long
if holdings > 0:
# If price made a new high, update the stop
if trade_bar.Close > self.highestPrice:
self.highestPrice = trade_bar.Close
self.PlaceOrUpdateTrailingStop()
def TrainModel(self):
df = self.GetFeatureDataFrame()
if df is None or len(df) < 50:
self.Debug("Insufficient data for training.")
return
X = df.iloc[:, :-1]
y = df.iloc[:, -1]
# 80/20 split (time-based, no shuffle)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, shuffle=False, random_state=42
)
# Fit the model
self.model.fit(X_train, y_train)
self.is_model_trained = True
# Evaluate
y_train_pred = self.model.predict(X_train)
train_accuracy = accuracy_score(y_train, y_train_pred)
y_test_pred = self.model.predict(X_test)
test_accuracy = accuracy_score(y_test, y_test_pred)
self.training_count += 1
self.Debug(f"Training #{self.training_count}: "
f"Train Accuracy: {train_accuracy:.2%}, "
f"Test Accuracy: {test_accuracy:.2%}")
def GetFeatureDataFrame(self):
"""
Build the DataFrame of features:
- Bollinger Bands (20-day)
- Historical Volatility (HV_30)
- Optionally RSI or short/long SMAs
- Target = next day's close higher than today's close
"""
if self.data.Count < 200:
return None
close_prices = [bar.Close for bar in self.data]
df = pd.DataFrame(close_prices, columns=["Close"])
# ---------------------
# 1) Bollinger Bands
# ---------------------
period = 20
df["BB_mid"] = df["Close"].rolling(period).mean()
df["BB_std"] = df["Close"].rolling(period).std()
df["BB_upper"] = df["BB_mid"] + 2 * df["BB_std"]
df["BB_lower"] = df["BB_mid"] - 2 * df["BB_std"]
# ---------------------
# 2) Historical Volatility (30-day)
# ---------------------
df["daily_returns"] = df["Close"].pct_change()
df["HV_30"] = df["daily_returns"].rolling(window=30).std() * np.sqrt(252)
# ---------------------
# 3) RSI (if self.useRSI == True)
# or short/long SMAs (if self.useRSI == False)
# ---------------------
if self.useRSI:
# RSI calculation over 14 days
delta = df["Close"].diff()
gain = (delta.where(delta > 0, 0)).rolling(14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(14).mean()
rs = gain / loss
df["RSI"] = 100 - (100 / (1 + rs))
else:
# Short/Long SMAs (e.g., 10-day and 50-day)
df["SMA_10"] = df["Close"].rolling(10).mean()
df["SMA_50"] = df["Close"].rolling(50).mean()
# ---------------------
# 4) Target
# ---------------------
df["Target"] = (df["Close"].shift(-1) > df["Close"]).astype(int)
# Cleanup
df.dropna(inplace=True)
# Remove daily_returns from final features
df.drop(columns=["daily_returns"], inplace=True)
return df
# -----------------------------
# Trailing Stop Methods
# -----------------------------
def PlaceOrUpdateTrailingStop(self):
"""
Places a stop-market order ticket if we don't have one,
or updates the stop price if we do.
"""
quantity = self.Portfolio[self.symbol].Quantity
if quantity <= 0:
return
newStopPrice = self.highestPrice * self.trailing_stop_pct
if (not self.trailStopTicket
or self.trailStopTicket.Status in [OrderStatus.Filled, OrderStatus.Canceled, OrderStatus.Invalid]):
# Create a new stop-market ticket (to sell our entire long position if triggered)
self.trailStopTicket = self.StopMarketOrder(self.symbol, -quantity, newStopPrice)
else:
# Update existing ticket
updateFields = UpdateOrderFields()
updateFields.StopPrice = newStopPrice
self.trailStopTicket.Update(updateFields)
def ResetTrailingStop(self):
"""
Resets trailing stop variables and cancels any active stop order
when we exit or reverse the position.
"""
self.highestPrice = 0
if self.trailStopTicket is not None:
if self.trailStopTicket.Status not in [OrderStatus.Filled, OrderStatus.Canceled, OrderStatus.Invalid]:
self.trailStopTicket.Cancel("Exiting or reversing position.")
self.trailStopTicket = None