| Overall Statistics |
|
Total Orders 29911 Average Win 0.09% Average Loss -0.08% Compounding Annual Return 11.704% Drawdown 12.900% Expectancy -0.053 Start Equity 1000000 End Equity 1246636.84 Net Profit 24.664% Sharpe Ratio 0.491 Sortino Ratio 0.536 Probabilistic Sharpe Ratio 21.329% Loss Rate 55% Win Rate 45% Profit-Loss Ratio 1.11 Alpha 0.079 Beta 0.331 Annual Standard Deviation 0.174 Annual Variance 0.03 Information Ratio 0.325 Tracking Error 0.198 Treynor Ratio 0.258 Total Fees $17163.76 Estimated Strategy Capacity $8000.00 Lowest Capacity Asset GE R735QTJ8XC9X Portfolio Turnover 23.64% |
#region imports
from AlgorithmImports import *
from collections import deque
import numpy as np
import scipy as sp
#endregion
class TSZscore_VwapReversion(AlphaModel):
def __init__(self):
self.period = 20
self.securities_list = []
self.day = -1
self.historical_VwapReversion_by_symbol = {}
def on_securities_changed(self, algorithm: QCAlgorithm, changes: SecurityChanges) -> None:
# Register each security in the universe
for security in changes.added_securities:
if security not in self.securities_list:
self.historical_VwapReversion_by_symbol[security.symbol] = deque(maxlen=self.period)
self.securities_list.append(security)
for security in changes.removed_securities:
if security in self.securities_list:
self.securities_list.remove(security)
def update(self, algorithm: QCAlgorithm, data: Slice) -> List[Insight]:
if data.quote_bars.count == 0: # Only emit insights when there is quote data, not when a corporate action occurs (at midnight)
return []
if self.day == algorithm.time.day: # Only emit insights once per day
return []
self.day = algorithm.time.day
# Neutralize Vwap/Close of securities so it's mean 0, then append them to the list
temp_list = {}
for security in self.securities_list:
if security.Close != 0:
temp_list[security.symbol] = algorithm.vwap(security.symbol).Current.Value/security.Close
else:
temp_list[security.symbol] = 1
temp_mean = sum(temp_list.values())/len(temp_list.values())
for security in self.securities_list:
self.historical_VwapReversion_by_symbol[security.symbol].appendleft(temp_list[security.symbol]-temp_mean)
# Compute ts_zscore of current Vwap/Close
zscore_by_symbol = {}
for security in self.securities_list:
zscore_by_symbol[security.symbol] = sp.stats.zscore(self.historical_VwapReversion_by_symbol[security.symbol])[0]
# create insights to long / short the asset
insights = []
weights = {}
for symbol, zscore in zscore_by_symbol.items():
if not np.isnan(zscore):
weight = zscore
else:
weight = 0
weights[symbol] = weight
# Make scale similar across alphas
abs_weight = {key: abs(val) for key, val in weights.items()}
weights_sum = sum(abs_weight.values())
if weights_sum != 0:
for symbol, weight in weights.items():
weights[symbol] = weight/ weights_sum
for symbol, weight in weights.items():
if weight > 0:
insights.append(Insight.price(symbol, Expiry.END_OF_DAY, InsightDirection.UP, weight=weight))
elif weight < 0:
insights.append(Insight.price(symbol, Expiry.END_OF_DAY, InsightDirection.DOWN, weight=weight))
return insights
class TSZscore_DividendGrowth(AlphaModel):
def __init__(self):
self.period = 252
self.day = -1
self.securities_list = []
self.dps = {}
def on_securities_changed(self, algorithm: QCAlgorithm, changes: SecurityChanges) -> None:
# Register each security in the universe
for security in changes.added_securities:
if security not in self.securities_list:
self.dps[security.symbol] = deque(maxlen=self.period)
self.securities_list.append(security)
for security in changes.removed_securities:
if security in self.securities_list:
self.securities_list.remove(security)
def update(self, algorithm: QCAlgorithm, data: Slice) -> List[Insight]:
if data.quote_bars.count == 0: # Only emit insights when there is quote data, not when a corporate action occurs (at midnight)
return []
if self.day == algorithm.time.day: # Only emit insights once per day
return []
self.day = algorithm.time.day
# Append dividend to the list, compute ts_zscore of current dividend
zscore_by_symbol = {}
for security in self.securities_list:
if not np.isnan(security.fundamentals.earning_reports.dividend_per_share.Value):
self.dps[security.symbol].appendleft(security.fundamentals.earning_reports.dividend_per_share.Value)
zscore_by_symbol[security.symbol] = sp.stats.zscore(self.dps[security.symbol])[0]
# create insights to long / short the asset
insights = []
weights = {}
for symbol, zscore in zscore_by_symbol.items():
if not np.isnan(zscore):
weight = zscore
else:
weight = 0
weights[symbol] = weight
# Make scale similar across alphas
abs_weight = {key: abs(val) for key, val in weights.items()}
weights_sum = sum(abs_weight.values())
if weights_sum != 0:
for symbol, weight in weights.items():
weights[symbol] = weight/ weights_sum
for symbol, weight in weights.items():
if weight >= 0:
insights.append(Insight.price(symbol, Expiry.END_OF_DAY, InsightDirection.UP, weight=weight))
else:
insights.append(Insight.price(symbol, Expiry.END_OF_DAY, InsightDirection.DOWN, weight=weight))
return insights
class Conditional_Reversion(AlphaModel):
def __init__(self):
self.condition_period = 5
self.period = 3
self.securities_list = []
self.day = -1
self.historical_volume_by_symbol = {}
self.historical_close_by_symbol = {}
def on_securities_changed(self, algorithm: QCAlgorithm, changes: SecurityChanges) -> None:
# Register each security in the universe
for security in changes.added_securities:
if security not in self.securities_list:
self.historical_volume_by_symbol[security.symbol] = deque(maxlen=self.condition_period)
self.historical_close_by_symbol[security.symbol] = deque(maxlen=self.period)
self.securities_list.append(security)
for security in changes.removed_securities:
if security in self.securities_list:
self.securities_list.remove(security)
def update(self, algorithm: QCAlgorithm, data: Slice) -> List[Insight]:
if data.quote_bars.count == 0: # Only emit insights when there is quote data, not when a corporate action occurs (at midnight)
return []
if self.day == algorithm.time.day: # Only emit insights once per month
return []
self.day = algorithm.time.day
# Append volume and close to the list
zscore_by_symbol = {}
return_by_symbol = {}
for security in self.securities_list:
if (security.Close != 0 and security.Volume != 0):
self.historical_close_by_symbol[security.symbol].appendleft(security.Close)
self.historical_volume_by_symbol[security.symbol].appendleft(security.Volume)
return_by_symbol[security.symbol] = (self.historical_close_by_symbol[security.symbol][0] - self.historical_close_by_symbol[security.symbol][-1])
if return_by_symbol == {}: # Don't emit insight if there's no valid data
return []
# Rank the 3 days return among securities to return value from 0 to 1
sorted_return_by_symbol = sorted(return_by_symbol.items(), key=lambda x: x[1])
return_rank_by_symbol = {}
for item in sorted_return_by_symbol: # item is a key-value pair. [0] is the security symbol and [1] is the return
return_rank_by_symbol[item[0]] = (sorted_return_by_symbol.index(item))/ len(sorted_return_by_symbol)
# Calculating the final weight
weights = {}
for security in self.securities_list:
# If condition is met, assign weight
if len(self.historical_volume_by_symbol[security.symbol]) != 0 and max(self.historical_volume_by_symbol[security.symbol]) == security.Volume:
weight = -return_rank_by_symbol[security.symbol] # Change this sign and complete different behaviour if purely long. Investigate
else:
weight = 0
weights[security.symbol] = weight
weights_mean = sum(weights.values())/len(weights.values())
for symbol, weight in weights.items():
weights[symbol] = weight - weights_mean
# Make scale similar across alphas
abs_weight = {key: abs(val) for key, val in weights.items()}
weights_sum = sum(abs_weight.values())
if weights_sum != 0:
for symbol, weight in weights.items():
weights[symbol] = weight/ weights_sum
# Create insights to long / short the asset
insights = []
for symbol, weight in weights.items():
if weight > 0:
insights.append(Insight.price(symbol, Expiry.END_OF_DAY, InsightDirection.UP, weight=weight))
elif weight < 0:
insights.append(Insight.price(symbol, Expiry.END_OF_DAY, InsightDirection.DOWN, weight=weight))
#Expiry.END_OF_DAY
return insights
def __init__(self):
self.day = -1
self.securities_list = []
self.sentiment_indicator_by_symbol = {}
self.sentiment_by_symbol = {}
self.past_close_by_symbol = {}
def on_securities_changed(self, algorithm: QCAlgorithm, changes: SecurityChanges) -> None:
# Register each security in the universe
for security in changes.added_securities:
if security not in self.securities_list:
self.securities_list.append(security)
self.sentiment_indicator_by_symbol[security.symbol] = algorithm.add_data(BrainSentimentIndicator7Day, security.symbol)
for security in changes.removed_securities:
if security in self.securities_list:
self.securities_list.remove(security)
self.sentiment_indicator_by_symbol.pop(security.symbol, None)
def update(self, algorithm: QCAlgorithm, data: Slice) -> List[Insight]:
if data.quote_bars.count == 0: # Only emit insights when there is quote data, not when a corporate action occurs (at midnight)
return []
if self.day == algorithm.time.day: # Only emit insights once per day
return []
self.day = algorithm.time.day
# Get sentiment value
for symbol, indicator in self.sentiment_indicator_by_symbol.items():
if slice.ContainsKey(indicator.dataset_symbol):
self.sentiment_by_symbol[symbol] = slice[indicator.dataset_symbol].sentiment
# Calculate dividend growth mean for neutralizing
dps_growth_by_symbol = {}
for security in self.securities_list:
if not np.isnan(security.fundamentals.earning_ratios.dps_growth.Value):
dps_growth_by_symbol[security.symbol] = security.fundamentals.earning_ratios.dps_growth.Value
dps_growth_mean = sum(dps_growth_by_symbol.values())/len(dps_growth_by_symbol.values())
# Neutralized the dividend growth
for security in self.securities_list:
if not np.isnan(security.fundamentals.earning_ratios.dps_growth.Value):
dps_growth_by_symbol[security.symbol] = security.fundamentals.earning_ratios.dps_growth.Value - dps_growth_mean
# Compute ts_zscore of current dividend growth
keys, vals = zip(*dps_growth_by_symbol.items())
z = sp.stats.zscore(vals)
zscore_by_symbol = dict(zip(keys,z))
# create insights to long / short the asset
insights = []
weights = {}
for symbol, zscore in zscore_by_symbol.items():
if not np.isnan(zscore):
weight = zscore
else:
weight = 0
weights[symbol] = weight
for symbol, weight in weights.items():
if weight >= 0:
insights.append(Insight.price(symbol, Expiry.END_OF_DAY, InsightDirection.UP, weight=weight))
else:
insights.append(Insight.price(symbol, Expiry.END_OF_DAY, InsightDirection.DOWN, weight=weight))
return insightsfrom AlgorithmImports import *
from alpha import *
import tensorflow as tf
import numpy as np
import pandas as pd
class MLPPredictionAlgorithm(QCAlgorithm):
def Initialize(self):
self.set_start_date(2021, 1, 3)
self.set_end_date(2023, 1, 1)
self.set_cash(1_000_000)
# initialize dictionary to hold historical data, lastest features for prediction
self.data = {}
self.latest_features = {}
self.predictions = {}
# initialize data storage for Alpha outputs
self.alpha_outputs = {}
# set stock universe
self.symbols = []
self.UniverseSettings.Resolution = Resolution.Daily
self.add_universe(self.CoarseSelectionFilter)
# add alphas
self.add_alpha(TSZscore_VwapReversion())
# initialize ML model
self.model = None
self.lookback = 19
self.SetWarmUp(self.lookback)
# schedule the training event, and prediction event
self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.At(
12, 0), self.TrainModel)
self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.At(
14, 30), self.MakePredictions)
# set portfolio construction, risk management, and execution
self.set_portfolio_construction(InsightWeightingPortfolioConstructionModel(rebalance=Expiry.EndOfMonth))
self.add_risk_management(NullRiskManagementModel())
self.set_execution(ImmediateExecutionModel())
def CoarseSelectionFilter(self, coarse):
# sort descending by daily dollar volume
sorted_by_dollar_volume = sorted(
coarse, key=lambda x: x.DollarVolume, reverse=True)
# select only Symbols with a price of more than $10 per share
self.symbols = [
c.Symbol for c in sorted_by_dollar_volume if c.Price > 10]
self.symbols = self.symbols[:100]
# initialize the historical data dictionary
for symbol in self.symbols:
if symbol not in self.data:
self.data[symbol] = pd.DataFrame()
return self.symbols
# def OnSecuritiesChanged(self, changes):
# for security in changes.AddedSecurities:
# symbol = security.Symbol
# if symbol not in self.data:
# self.data[symbol] = self.History(
# symbol, self.lookback, Resolution.Daily)
# for security in changes.RemovedSecurities:
# symbol = security.Symbol
# if symbol in self.data:
# del self.data[symbol]
def OnData(self, data):
if self.is_warming_up:
return
for symbol in self.symbols:
if symbol not in data.Bars:
self.Debug("Missing data for symbol: " + str(symbol))
continue
# retrieve the historical data
history = self.History(symbol, timedelta(
days=self.lookback), Resolution.DAILY)
# update self.data with historical data
if not history.empty:
self.data[symbol] = history
def ModelConstruction(self):
model = tf.keras.models.Sequential()
model.add(tf.keras.layers.Dense(
64, input_shape=(5,), activation=tf.nn.relu))
model.add(tf.keras.layers.Dense(64, activation=tf.nn.relu))
model.add(tf.keras.layers.Dense(64, activation=tf.nn.relu))
model.add(tf.keras.layers.Dense(1, activation='linear'))
optimizer = tf.keras.optimizers.Adam(0.001)
model.compile(optimizer=optimizer, loss='mse', metrics=['accuracy'])
return model
def TrainModel(self):
X = []
y = []
for symbol in self.symbols:
# self.debug(f"symbol: {symbol}, shape: {self.data[symbol].shape}")
if self.data[symbol].shape[0] == 0:
continue
# drop NA, normalize data
normalized_data = self.data[symbol].dropna()
normalized_data = (normalized_data - normalized_data.mean()) / normalized_data.std()
# self.debug(f"normalized data: {normalized_data.iloc[1, :]}")
for i in range(0, self.data[symbol].shape[0]):
# flatten the 3d standardize data into 2d, lose symbol information, question: what about NA value?
features = normalized_data.iloc[i]
# self.debug(f"symbol: {symbol}, i: {i}, features: {features}")
features = features.values.flatten()
if len(features) == 5:
X.append(features)
else:
self.Debug(f"Inconsistent feature length for {symbol} at index {i}")
for i in range(0, self.data[symbol].shape[0] - 1):
# get the target label (return)
symbol_return = (self.data[symbol].iloc[i + 1]['close'] -
self.data[symbol].iloc[i]['close']) / self.data[symbol].iloc[i]['close']
y.append(symbol_return)
# last row will be stored for the prediction
self.latest_features[symbol] = normalized_data.iloc[-1].values.flatten()
# X discard the last row
X = X[:-1]
# self.debug(f"X: {X}")
# self.debug(f"y: {y}")
# train the MLP model
if len(X) > 0 and len(y) > 0:
X = np.array(X)
y = np.array(y)
X = X.reshape(-1, 5)
# y = y * 10000 # scale the return
self.debug(f"X shape: {X.shape}")
self.debug(f"y length: {len(y)}")
# Train the MLP model
self.model = self.ModelConstruction()
self.model.fit(X, y, epochs=10, batch_size=5, verbose=1)
self.Debug("Model trained with {} samples.".format(len(X)))
else:
self.Debug("Not enough data to train the model.")
def MakePredictions(self):
self.debug("prediction")
if not self.model:
self.Debug("Model is not trained yet")
return
# clear previous predictions
self.predictions.clear()
for symbol in self.symbols:
if symbol not in self.latest_features:
self.debug(f"Missing latest features for symbol {symbol}")
continue
features = self.latest_features[symbol].reshape(1, -1)
# self.debug(f"prediction input shape: {features.shape}")
# self.debug(f"prediction input for {symbol}: {features}")
prediction = self.model.predict(features)
self.predictions[symbol] = prediction[0][0]
self.debug(f"Prediction for symbol {symbol}: {self.predictions[symbol]}")
self.debug(f"Predictions: {self.predictions}")
if len(self.predictions) > 0:
self.TradeOnPredictions()
def TradeOnPredictions(self):
# rank the stocks
sorted_predictions = sorted(self.predictions.items(), key=lambda item: item[1])
n = len(sorted_predictions)
long_threshold = int(n * 0.8)
short_threshold = int(n * 0.2)
for i, (symbol, prediction) in enumerate(sorted_predictions):
if i < short_threshold:
self.SetHoldings(symbol, -1.0 / short_threshold)
self.Debug(f"Short: {symbol}")
elif i >= long_threshold:
self.SetHoldings(symbol, 1.0 / long_threshold)
self.Debug(f"Long: {symbol}")
else:
self.SetHoldings(symbol, 0)
self.Debug(f"No Position: {symbol}")