| Overall Statistics |
|
Total Orders 226 Average Win 1.40% Average Loss -1.27% Compounding Annual Return -0.294% Drawdown 31.100% Expectancy -0.023 Start Equity 100000 End Equity 98157.93 Net Profit -1.842% Sharpe Ratio -0.2 Sortino Ratio -0.183 Probabilistic Sharpe Ratio 0.208% Loss Rate 54% Win Rate 46% Profit-Loss Ratio 1.10 Alpha -0.047 Beta 0.24 Annual Standard Deviation 0.132 Annual Variance 0.017 Information Ratio -0.617 Tracking Error 0.181 Treynor Ratio -0.11 Total Fees $320.83 Estimated Strategy Capacity $78000000.00 Lowest Capacity Asset IYR RVLEALAHHC2T Portfolio Turnover 3.54% Drawdown Recovery 1184 |
#region imports
from AlgorithmImports import *
#endregion
class CloseOnCloseExecutionModel(ExecutionModel):
"""
Provides an implementation of IExecutionModel that immediately submits a market order to achieve
the desired portfolio targets and an associated market on close order.
"""
def __init__(self):
self.targetsCollection = PortfolioTargetCollection()
self.invested_symbols = []
def Execute(self, algorithm, targets):
"""
Immediately submits orders for the specified portfolio targets.
Input:
- algorithm
Algorithm instance running the backtest
- targets
The portfolio targets to be ordered
"""
# for performance we check count value, OrderByMarginImpact and ClearFulfilled are expensive to call
self.targetsCollection.AddRange(targets)
if self.targetsCollection.Count > 0:
for target in self.targetsCollection.OrderByMarginImpact(algorithm):
# calculate remaining quantity to be ordered
quantity = OrderSizing.GetUnorderedQuantity(algorithm, target)
if quantity == 0:
continue
algorithm.MarketOrder(target.Symbol, quantity)
algorithm.MarketOnCloseOrder(target.Symbol, -quantity)
self.targetsCollection.ClearFulfilled(algorithm)#region imports
from AlgorithmImports import *
#endregion
import tensorflow as tf
from tensorflow.keras.layers import Input, Conv1D, Dense, Lambda, Flatten, Concatenate
from tensorflow.keras import Model
from tensorflow.keras import metrics
from tensorflow.keras.losses import CategoricalCrossentropy
from tensorflow.keras import utils
from sklearn.preprocessing import StandardScaler
import numpy as np
import math
# varibles (aka features in ML lingo) used to make predictions
input_vars = ['open', 'high', 'low', 'close', 'volume']
class Direction:
'''Constants used for labeling price movements'''
# labels must be integers because Keras (and most ML Libraries)
# only work with numbers
UP = 0
DOWN = 1
STATIONARY = 2
class MyTemporalCNN:
'''Temporal Convolutional Neural Network Model built upon Keras'''
# the name describes the architecture of the Neural Network model
# Temporal refers to the fact the layers are separated temporally into three regions
# Convolutional refers to the fact Convolutional layers are used to extract features
def __init__(self, n_tsteps = 15):
# n_tsteps = number of time steps in time series for one input/prediction
self.n_tsteps = n_tsteps
self.scaler = StandardScaler() # used for Feature Scaling
self.__CreateModel()
def __CreateModel(self):
'''Creates the neural network model'''
inputs = Input(shape=(self.n_tsteps, len(input_vars)))
# extract our features using a Convolutional layers, hence "CNN"
feature_extraction = Conv1D(30, 4, activation='relu')(inputs)
# split layer into three regions based on time, hence "Temporal"
long_term = Lambda( lambda x: tf.split(x, num_or_size_splits=3, axis=1)[0])(feature_extraction)
mid_term = Lambda( lambda x: tf.split(x, num_or_size_splits=3, axis=1)[1])(feature_extraction)
short_term = Lambda( lambda x: tf.split(x, num_or_size_splits=3, axis=1)[2])(feature_extraction)
long_term_conv = Conv1D(1, 1, activation='relu')(long_term)
mid_term_conv = Conv1D(1, 1, activation='relu')(mid_term)
short_term_conv = Conv1D(1, 1, activation='relu')(short_term)
# combine three layers back into one
combined = Concatenate(axis=1)([long_term_conv, mid_term_conv, short_term_conv])
# flattening is required since our input is a 2D matrix
flattened = Flatten()(combined)
# 1 output neuron for each class (Up, Stationary, Down --- see Direction class)
outputs = Dense(3, activation='softmax')(flattened)
# specify input and output layers of our model
self.model = Model(inputs=inputs, outputs=outputs)
# compile our model
self.model.compile(optimizer='adam',
loss=CategoricalCrossentropy(from_logits=True))
def __PrepareData(self, data, rolling_avg_window_size=5, stationary_threshold=.0001):
'''Prepares the data for a format friendly for our model'''
# rolling_avg_window_size = window size for the future mid prices to average,
# this average is what the model wants to predict
# stationary_threshold = maximum change of movement to be considered stationary
# for the average mid price stated above
df = data[input_vars]
shift = -(rolling_avg_window_size-1)
# function we will use to label our data (used in line )
def label_data(row):
if row['close_avg_change_pct'] > stationary_threshold:
return Direction.UP
elif row['close_avg_change_pct'] < -stationary_threshold:
return Direction.DOWN
else:
return Direction.STATIONARY
# compute the % change in the average of the close of the future 5 time steps
# at each time step
df['close_avg'] = df['close'].rolling(window=rolling_avg_window_size).mean().shift(shift)
df['close_avg_change_pct'] = (df['close_avg'] - df['close']) / df['close']
# label data based on direction,
# axis=1 signifies a row-wise operation (axis=0 is col-wise)
df['movement_labels'] = df.apply(label_data, axis=1)
# lists to store each 2D input matrix and the corresponding label
data = []
labels = []
for i in range(len(df)-self.n_tsteps+1+shift):
label = df['movement_labels'].iloc[i+self.n_tsteps-1]
data.append(df[input_vars].iloc[i:i+self.n_tsteps].values)
labels.append(label)
data = np.array(data)
# temporarily reshape data to 2D,
# necessary because sklearn only works wtih 2D data
dim1, dim2, dim3 = data.shape
data = data.reshape(dim1*dim2, dim3)
# fit our scaler and transform our data in one method call
data = self.scaler.fit_transform(data)
# return data to original shape
data = data.reshape(dim1, dim2, dim3)
# Keras needs dummy matrices for classification problems,
# hence the need for to_categorical()
# num classes ensures our dummy matrix has 3 columns,
# one for each label (Up, Down, Stationary)
return data, utils.to_categorical(labels, num_classes=3)
def Train(self, data):
'''Trains the model'''
data, labels = self.__PrepareData(data)
self.model.fit(data, labels, epochs=20)
def Predict(self, input_data):
'''Makes a prediction on the direction of the future stock price'''
input_data = self.scaler.transform(input_data.fillna(method='ffill').values)
prediction = self.model.predict(input_data[np.newaxis, :])[0]
direction = np.argmax(prediction)
confidence = prediction[direction]
return direction, confidence#region imports
from AlgorithmImports import *
#endregion
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
class TinyTemporalCNN(nn.Module):
def __init__(self, number_of_features):
super(TinyTemporalCNN, self).__init__()
self.network = nn.Sequential(
nn.Conv1d(number_of_features, 16, kernel_size=3, padding=1),
nn.ReLU(),
nn.Dropout(0.10),
nn.Conv1d(16, 8, kernel_size=3, padding=1),
nn.ReLU(),
nn.AdaptiveAvgPool1d(1)
)
self.classifier = nn.Linear(8, 2)
def forward(self, x):
x = self.network(x)
x = x.squeeze(-1)
return self.classifier(x)
class CNNModelWrapper:
def __init__(self, sequence_length, prediction_horizon, epochs, learning_rate):
self.sequence_length = sequence_length
self.prediction_horizon = prediction_horizon
self.epochs = epochs
self.learning_rate = learning_rate
self.model = None
self.feature_mean = None
self.feature_std = None
self.is_trained = False
def Train(self, data):
features, labels = self.CreateTrainingSet(data)
if features is None or len(features) < 20:
self.is_trained = False
return False
self.feature_mean = features.mean(axis=(0, 1), keepdims=True)
self.feature_std = features.std(axis=(0, 1), keepdims=True)
self.feature_std[self.feature_std == 0] = 1.0
features = (features - self.feature_mean) / self.feature_std
x_train = torch.tensor(features, dtype=torch.float32)
x_train = x_train.permute(0, 2, 1)
y_train = torch.tensor(labels, dtype=torch.long)
self.model = TinyTemporalCNN(features.shape[2])
optimizer = optim.Adam(self.model.parameters(), lr=self.learning_rate)
loss_function = nn.CrossEntropyLoss()
self.model.train()
for _ in range(self.epochs):
optimizer.zero_grad()
output = self.model(x_train)
loss = loss_function(output, y_train)
loss.backward()
optimizer.step()
self.is_trained = True
return True
def Predict(self, data):
if not self.is_trained:
return 0, 0.0
features = self.CreatePredictionSet(data)
if features is None:
return 0, 0.0
features = (features - self.feature_mean) / self.feature_std
x = torch.tensor(features, dtype=torch.float32)
x = x.permute(0, 2, 1)
self.model.eval()
with torch.no_grad():
logits = self.model(x)
probabilities = torch.softmax(logits, dim=1).numpy()[0]
down_probability = probabilities[0]
up_probability = probabilities[1]
if up_probability >= down_probability:
return 1, float(up_probability)
return -1, float(down_probability)
def CreateFeatureFrame(self, data):
required = ["open", "high", "low", "close", "volume"]
for column in required:
if column not in data.columns:
return None
frame = pd.DataFrame(index=data.index)
frame["return_1d"] = data["close"].pct_change()
frame["range"] = (data["high"] - data["low"]) / data["close"]
frame["close_open"] = (data["close"] - data["open"]) / data["open"]
frame["volume_change"] = data["volume"].pct_change()
frame = frame.replace([np.inf, -np.inf], np.nan)
frame = frame.dropna()
return frame
def CreateTrainingSet(self, data):
feature_frame = self.CreateFeatureFrame(data)
if feature_frame is None:
return None, None
close = data["close"].loc[feature_frame.index]
future_return = close.shift(-self.prediction_horizon) / close - 1.0
labels = (future_return > 0).astype(int)
features = feature_frame.values
x_samples = []
y_samples = []
for i in range(self.sequence_length, len(feature_frame) - self.prediction_horizon):
x_samples.append(features[i - self.sequence_length:i])
y_samples.append(int(labels.iloc[i]))
if len(x_samples) < 20:
return None, None
return np.array(x_samples), np.array(y_samples)
def CreatePredictionSet(self, data):
feature_frame = self.CreateFeatureFrame(data)
if feature_frame is None:
return None
if len(feature_frame) < self.sequence_length:
return None
latest_window = feature_frame.values[-self.sequence_length:]
return np.array([latest_window])
class SelfContainedCNNETFStrategy(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2020, 1, 15)
self.SetEndDate(2026, 5, 5)
self.initial_cash = 100000
self.SetCash(self.initial_cash)
# ------------------------------------------------------------
# Parameters
# ------------------------------------------------------------
self.training_lookback = self.GetIntParameter("training_lookback", 500)
self.sequence_length = self.GetIntParameter("sequence_length", 30)
self.prediction_horizon = self.GetIntParameter("prediction_horizon", 5)
self.training_epochs = self.GetIntParameter("training_epochs", 15)
self.learning_rate = self.GetFloatParameter("learning_rate", 0.001)
self.retrain_frequency_months = self.GetIntParameter("retrain_frequency_months", 3)
self.trade_frequency_days = self.GetIntParameter("trade_frequency_days", 5)
# Lower default so trades actually occur.
self.confidence_threshold = self.GetFloatParameter("confidence_threshold", 0.50)
self.target_gross_exposure = self.GetFloatParameter("target_gross_exposure", 1.00)
self.allow_short = self.GetBoolParameter("allow_short", True)
self.use_contrarian_signal = self.GetBoolParameter("use_contrarian_signal", False)
self.rebalance_threshold = self.GetFloatParameter("rebalance_threshold", 0.01)
self.training_lookback = max(150, self.training_lookback)
self.sequence_length = max(10, self.sequence_length)
self.prediction_horizon = max(1, self.prediction_horizon)
self.training_epochs = max(1, self.training_epochs)
self.learning_rate = max(0.00001, self.learning_rate)
self.retrain_frequency_months = max(1, self.retrain_frequency_months)
self.trade_frequency_days = max(1, self.trade_frequency_days)
self.confidence_threshold = max(0.50, min(0.95, self.confidence_threshold))
self.target_gross_exposure = max(0.0, min(2.0, self.target_gross_exposure))
# ------------------------------------------------------------
# Universe
# ------------------------------------------------------------
tickers = ["QQQ", "IVV", "IYR"]
self.symbols = []
for ticker in tickers:
symbol = self.AddEquity(ticker, Resolution.Daily).Symbol
self.symbols.append(symbol)
self.SetBenchmark("IVV")
self.models = {}
self.training_complete = False
self.last_trade_date = None
self.last_train_month_index = None
self.SetWarmUp(self.sequence_length + self.prediction_horizon + 30, Resolution.Daily)
self.Schedule.On(
self.DateRules.MonthStart("IVV"),
self.TimeRules.AfterMarketOpen("IVV", 30),
self.TrainModels
)
self.Schedule.On(
self.DateRules.EveryDay("IVV"),
self.TimeRules.BeforeMarketClose("IVV", 30),
self.Trade
)
def TrainModels(self):
if self.IsWarmingUp:
return
current_month_index = self.Time.year * 12 + self.Time.month
if self.last_train_month_index is not None:
months_since_training = current_month_index - self.last_train_month_index
if months_since_training < self.retrain_frequency_months:
return
self.Debug("CNN training started on " + str(self.Time.date()))
trained_count = 0
for symbol in self.symbols:
history = self.History(symbol, self.training_lookback, Resolution.Daily)
data = self.CleanHistory(history, for_training=True)
if data is None:
self.Debug("Training skipped for " + symbol.Value)
continue
model = CNNModelWrapper(
sequence_length=self.sequence_length,
prediction_horizon=self.prediction_horizon,
epochs=self.training_epochs,
learning_rate=self.learning_rate
)
try:
if model.Train(data):
self.models[symbol] = model
trained_count += 1
self.Debug("Trained CNN for " + symbol.Value)
except Exception as error:
self.Debug("Training failed for " + symbol.Value + ": " + str(error))
self.training_complete = trained_count > 0
if self.training_complete:
self.last_train_month_index = current_month_index
self.Debug("Models trained: " + str(trained_count))
self.Plot("Model Diagnostics", "Models Trained", trained_count)
def Trade(self):
if self.IsWarmingUp:
return
# Important: try to train immediately if no model exists yet.
if not self.training_complete:
self.TrainModels()
if not self.training_complete:
return
if self.last_trade_date is not None:
days_since_last_trade = (self.Time.date() - self.last_trade_date).days
if days_since_last_trade < self.trade_frequency_days:
return
raw_signals = {}
for symbol in self.symbols:
if symbol not in self.models:
continue
# FIX:
# Prediction needs enough rows for feature creation and sequence length.
history = self.History(
symbol,
self.sequence_length + self.prediction_horizon + 30,
Resolution.Daily
)
data = self.CleanHistory(history, for_training=False)
if data is None:
continue
direction, confidence = self.models[symbol].Predict(data)
if confidence < self.confidence_threshold:
raw_signals[symbol] = 0.0
continue
if self.use_contrarian_signal:
direction = -direction
if direction < 0 and not self.allow_short:
raw_signals[symbol] = 0.0
continue
raw_signals[symbol] = direction * confidence
self.Debug(
str(self.Time.date())
+ " | "
+ symbol.Value
+ " direction="
+ str(direction)
+ " confidence="
+ str(round(confidence, 4))
)
self.Plot("Model Confidence", symbol.Value, confidence)
target_weights = self.BuildTargetWeights(raw_signals)
trades_sent = 0
for symbol, target_weight in target_weights.items():
current_weight = self.GetCurrentWeight(symbol)
if abs(target_weight - current_weight) >= self.rebalance_threshold:
self.SetHoldings(symbol, target_weight)
trades_sent += 1
if trades_sent > 0:
self.last_trade_date = self.Time.date()
self.Plot("Strategy Equity", "Portfolio Value", self.Portfolio.TotalPortfolioValue)
self.Plot("Model Diagnostics", "Active Signals", sum(1 for x in raw_signals.values() if abs(x) > 0))
self.Plot("Model Diagnostics", "Trades Sent", trades_sent)
def BuildTargetWeights(self, raw_signals):
target_weights = {}
for symbol in self.symbols:
target_weights[symbol] = 0.0
gross_signal = sum(abs(value) for value in raw_signals.values())
if gross_signal <= 0:
return target_weights
for symbol, signal_value in raw_signals.items():
target_weights[symbol] = self.target_gross_exposure * signal_value / gross_signal
return target_weights
def CleanHistory(self, history, for_training):
if history is None:
return None
if not hasattr(history, "empty"):
return None
if history.empty:
return None
data = history.copy()
if isinstance(data.index, pd.MultiIndex):
try:
data = data.droplevel(0)
except:
pass
required = ["open", "high", "low", "close", "volume"]
for column in required:
if column not in data.columns:
return None
data = data[required].dropna()
if for_training:
minimum_rows = self.sequence_length + self.prediction_horizon + 30
else:
minimum_rows = self.sequence_length + 2
if len(data) < minimum_rows:
return None
return data
def GetCurrentWeight(self, symbol):
if self.Portfolio.TotalPortfolioValue <= 0:
return 0.0
return self.Portfolio[symbol].HoldingsValue / self.Portfolio.TotalPortfolioValue
def GetIntParameter(self, name, default_value):
value = self.GetParameter(name)
if value is None or value == "":
return default_value
return int(value)
def GetFloatParameter(self, name, default_value):
value = self.GetParameter(name)
if value is None or value == "":
return default_value
return float(value)
def GetBoolParameter(self, name, default_value):
value = self.GetParameter(name)
if value is None or value == "":
return default_value
value = str(value).lower()
if value in ["1", "true", "yes", "y"]:
return True
if value in ["0", "false", "no", "n"]:
return False
return default_value