| Overall Statistics |
|
Total Trades 207 Average Win 0.18% Average Loss -0.08% Compounding Annual Return 12.096% Drawdown 17.900% Expectancy 0.251 Net Profit 133.002% Sharpe Ratio 0.937 Probabilistic Sharpe Ratio 37.084% Loss Rate 60% Win Rate 40% Profit-Loss Ratio 2.11 Alpha 0.053 Beta 0.359 Annual Standard Deviation 0.092 Annual Variance 0.009 Information Ratio -0.056 Tracking Error 0.122 Treynor Ratio 0.241 Total Fees $259.99 Estimated Strategy Capacity $33000000.00 Lowest Capacity Asset HAL R735QTJ8XC9X |
from AlgorithmImports import *
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split, RandomizedSearchCV
from sklearn.metrics import mean_squared_error as MSE
import sklearn as sk
import xgboost as xgb
import pandas as pd
import numpy as np
from QuantConnect.Algorithm.Framework.Portfolio import PortfolioTarget
from Execution.ImmediateExecutionModel import ImmediateExecutionModel
from QuantConnect import *
from risk import BracketRiskModel
class ModulatedNadionReplicator(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2015, 1, 1) # Set Start Date
self.SetCash(100000) # Set Strategy Cash
# the coarse universe selection model that takes top volume stocks over 5,000,000 in volume and between $20 - $200
# then in the fine universe selection it gets the top stocks sorted by the Earnings Filing Dates fundemental metric
# set the resolution to be hour
self.__numberOfSymbols = 30
self.__numberOfSymbolsFine = 10
self.SetUniverseSelection(FineFundamentalUniverseSelectionModel(self.CoarseSelectionFunction, self.FineSelectionFunction))
self.UniverseSettings.Resolution = Resolution.Hour
# standard immidiate execution model
self.SetExecution(ImmediateExecutionModel())
# insight weighted portfolio model with 5% of the portfolio liquid
self.Settings.FreePortfolioValuePercentage = 0.05
self.SetPortfolioConstruction(InsightWeightingPortfolioConstructionModel())
# custom bracket risk model, trailing stop with profit taker
self.up_value = 0.15
self.down_value = 0.05
self.SetRiskManagement(BracketRiskModel(self.down_value, self.up_value))
# standard alpha streams brokerage model
self.SetBrokerageModel(AlphaStreamsBrokerageModel())
# get ready the symbol data dictionary to hold our data passed from the SymbolData class
self.symbolDataBySymbol = {}
self.isTrained = False
self.symbolFlag = {}
# global dictionary to store our models
self.models = {}
# the market symbol in this example "SPY"
self.AddEquity("SPY")
# training and scheduling methods
self.Train(self.TrainingMethod)
self.Train(self.DateRules.MonthEnd(), self.TimeRules.At(8,0), self.TrainingMethod)
self.Schedule.On(self.DateRules.EveryDay('SPY'), self.TimeRules.AfterMarketOpen('SPY', 30), self.Predict)
# coarse universe
def CoarseSelectionFunction(self, coarse):
# rebalance logic for a one month rebalance at midnight
if not self.Time.day % 30 == 1 and not self.Time.hour % 24 == 1: return self.Universe.Unchanged
# sort descending by daily dollar volume
sortedByDollarVolume = sorted(coarse, key=lambda x: x.DollarVolume, reverse=True)
filtered = [ x.Symbol for x in sortedByDollarVolume if x.HasFundamentalData and x.Price >= 20 and x.Price <= 200 and x.DollarVolume > 5000000 ]
return [ x for x in filtered[:self.__numberOfSymbols] ]
# fine universe
def FineSelectionFunction(self, fine):
# sort by most recent earnings dates
sortedByEarnings = sorted(fine, key=lambda x: x.EarningReports.FileDate, reverse=False)[:self.__numberOfSymbolsFine] # sort by most recent earnings dates
return [ x.Symbol for x in sortedByEarnings ]
def TrainingMethod(self):
# lets the predict function know when the model has been trained
self.isTrained = False
scores = []
# go through the dictionary of symbol data we recieved from our SymbolData class
for symbol, symbolData in self.symbolDataBySymbol.items():
try:
# don't run the model on the "SPY" symbol due to this being our market indicator symbol
if str(symbol) == 'SPY': break
self.symbolFlag[symbol] = False
# wait until the indicator data is ready
if not (symbolData.atr.IsReady and symbolData.rsi.IsReady and symbolData.market_rocp.IsReady and symbolData.rocp.IsReady): continue
# load up the indicator values into numpy arrays, as this is the format we will need for the xgboost regressor
market_rocp = np.array([x.Value for x in symbolData.market_rocpWin]).reshape(-1,1)
rocp = np.array([x.Value for x in symbolData.rocpWin]).reshape(-1,1)
rsi = np.array([x.Value for x in symbolData.rsiWin]).reshape(-1,1)
atr = np.array([x.Value for x in symbolData.atrWin]).reshape(-1,1)
# scale the data down for better feature quality
scaler = MinMaxScaler(feature_range=(-1, 1))
scaler.fit(rocp)
scaled_rocp = scaler.transform(rocp)
scaler.fit(atr)
scaled_atr = scaler.transform(atr)
scaler.fit(market_rocp)
scaled_market_rocp = scaler.transform(market_rocp)
scaler.fit(rsi)
scaled_rsi = scaler.transform(rsi)
# create our indicator dataframe to feed into our train_test_split funtion
indicator_df = pd.DataFrame(np.hstack((scaled_market_rocp, scaled_rsi, scaled_atr)))
# split our data into training and testing groups for our model
x_train, x_valid, y_train, y_valid = train_test_split(indicator_df, scaled_rocp, test_size=0.35, random_state=42)
# fit the model to the data with the eval set with the parameters below passed to the random search optimizer with 5 iterations and a cv of 4
parameters = {
'n_estimators': [100, 200, 300, 400],
'learning_rate': [0.001, 0.005, 0.01, 0.05],
'max_depth': [8, 10, 12, 15],
'gamma': [0.001, 0.005, 0.01, 0.02],
'random_state': [42]
}
eval_set = [(x_train, y_train), (x_valid, y_valid)]
self.models[symbol] = xgb.XGBRegressor(eval_set=eval_set, objective='reg:squarederror', verbose=False)
self.models[symbol] = RandomizedSearchCV(estimator=self.models[symbol], param_distributions=parameters, n_iter=5, scoring='neg_mean_squared_error', cv=4, verbose=1)
self.models[symbol].fit(x_train, y_train, eval_set=eval_set, verbose=False)
y_pred = self.models[symbol].predict(x_valid)
score = np.sqrt(MSE(y_valid, y_pred))
# you can always experiment with the more ways of setting your trained flag like below
# scores.append(score)
# if len(scores) > 2:
# scores_avg = sum(scores) / len(scores)
# if score < scores_avg:
# self.symbolFlag[symbol] = True
# self.Debug(f'Scores Average: {scores_avg}')
self.symbolFlag[symbol] = True
self.Debug(f'Score: {score}')
self.Debug(f'Trained Model: {symbol}')
self.Debug("Models Trained")
except:
self.Debug(f"error_training {symbol}")
self.isTrained = True
return
def Predict(self):
# only predict if the model is trained
if self.isTrained == True:
# go through our symbol data from the SymbolData class
for symbol, symbolData in self.symbolDataBySymbol.items():
try:
# don't run the model on the "SPY" symbol due to this being our market indicator symbol
if str(symbol) == 'SPY': break
if symbol in self.symbolFlag and self.symbolFlag[symbol] == True:
# wait until the indicator data is ready
if not (symbolData.atr.IsReady and symbolData.rsi.IsReady and symbolData.market_rocp.IsReady and symbolData.rocp.IsReady): continue
# load up the indicator values into numpy arrays, as this is the format we will need for the xgboost regressor
market_rocp = np.array([x.Value for x in symbolData.market_rocpWin]).reshape(-1,1)
rocp = np.array([x.Value for x in symbolData.rocpWin]).reshape(-1,1)
rsi = np.array([x.Value for x in symbolData.rsiWin]).reshape(-1,1)
atr = np.array([x.Value for x in symbolData.atrWin]).reshape(-1,1)
# scale the data down for better feature quality
scaler = MinMaxScaler(feature_range=(-1, 1))
scaler.fit(rocp)
scaled_rocp = scaler.transform(rocp)
scaler.fit(atr)
scaled_atr = scaler.transform(atr)
scaler.fit(market_rocp)
scaled_market_rocp = scaler.transform(market_rocp)
scaler.fit(rsi)
scaled_rsi = scaler.transform(rsi)
# create our indicator dataframe to feed into our train_test_split funtion
indicator_df = pd.DataFrame(np.hstack((scaled_market_rocp, scaled_rsi, scaled_atr)))
# make sure our model exists
if symbol in self.models:
# load our model that we know is ready due to our flag for the model being tained is true
model = self.models[symbol]
# predict on new data with our model
model_predict = model.predict(indicator_df)
# get the most current prediction we will use for our insight logic
magnitude = round(float(model_predict[-1]), 2)
# get some important metrics we can use for our insights
market_avg = (sum(scaled_market_rocp) / len(scaled_market_rocp))
rocp_avg = (sum(scaled_rocp) / len(scaled_rocp))
mag = abs(magnitude)
# direction starts flat
direction = InsightDirection.Flat
self.Debug(f"Predicted Model: {symbol}")
self.Debug(f'prediction: {magnitude}')
self.Debug(f'rocp_avg: {rocp_avg}')
self.Debug(f'market_avg: {market_avg}')
# you could change your risk values to be more dynamic values
self.down_value = abs(market_avg) * 0.2
self.up_value = mag * 0.8
# if prediction is above 0.5%, go long
if magnitude > 0.5:
direction = InsightDirection.Up
# if the prediction is below -0.5%, go short
if magnitude < -0.5:
direction = InsightDirection.Down
self.Debug(f'Direction: {str(direction)}')
# emit our signal using the rocp_avg as the expected move, the market_avg as the confidence,
# and the absolute magnitude multiplied by .3 as our weight
self.EmitInsights(Insight.Price(symbol, timedelta(hours = 24), direction, rocp_avg, market_avg, None, (.3 * mag)))
except:
self.Debug(f"issue trading {symbol}")
return
# on symbol change we need to remove the symbols that are dropping off our universe from our subscription manager
# alternatively we call our history data, instantiate our SymbolData class, and update our indcator data
def OnSecuritiesChanged(self, changes):
symbols = [ x.Symbol for x in changes.RemovedSecurities ]
if len(symbols) > 0:
for subscription in self.SubscriptionManager.Subscriptions:
if subscription.Symbol in symbols:
self.symbolDataBySymbol.pop(subscription.Symbol, None)
if subscription.Symbol in self.models:
self.models.pop(subscription.Symbol, None)
if subscription.Symbol in self.symbolFlag:
self.symbolFlag.pop(subscription.Symbol, None)
subscription.Consolidators.Clear()
addedSymbols = [ x.Symbol for x in changes.AddedSecurities if x.Symbol not in self.symbolDataBySymbol]
if len(addedSymbols) == 0: return
history = self.History(addedSymbols, 24, Resolution.Hour)
for symbol in addedSymbols:
symbolData = SymbolData(symbol, self)
self.symbolDataBySymbol[symbol] = symbolData
if not history.empty:
ticker = SymbolCache.GetTicker(symbol)
if ticker not in history.index.levels[0]:
continue
for tuple in history.loc[ticker].itertuples():
symbolData.market_rocp.Update(tuple.Index, tuple.close)
symbolData.rocp.Update(tuple.Index, tuple.close)
symbolData.rsi.Update(tuple.Index, tuple.close)
for bar in history.itertuples():
tradebar = TradeBar(bar.Index[1], symbol, bar.open, bar.high, bar.low, bar.close, bar.volume)
symbolData.atr.Update(tradebar)
# SymbolData class that is used to get us our indicator values
class SymbolData:
'''Contains data
specific to a symbol required by this model'''
def __init__(self, symbol, algorithm):
self.Insight = None
self.Symbol = symbol
self.market_rocp = algorithm.ROCP('SPY', 24)
self.rocp = algorithm.ROCP(symbol, 24)
self.rsi = algorithm.RSI(symbol,12, MovingAverageType.Simple)
self.atr = algorithm.ATR(symbol, 24)
self.window_length = 24
self.market_rocp.Updated += self.market_RocpUpdated
self.market_rocpWin = RollingWindow[IndicatorDataPoint](self.window_length)
self.rocp.Updated += self.RocpUpdated
self.rocpWin = RollingWindow[IndicatorDataPoint](self.window_length)
self.rsi.Updated += self.RsiUpdated
self.rsiWin = RollingWindow[IndicatorDataPoint](self.window_length)
self.atr.Updated += self.AtrUpdated
self.atrWin = RollingWindow[IndicatorDataPoint](self.window_length)
def market_RocpUpdated(self, sender, updated):
self.market_rocpWin.Add(updated)
def RocpUpdated(self, sender, updated):
self.rocpWin.Add(updated)
def RsiUpdated(self, sender, updated):
self.rsiWin.Add(updated)
def AtrUpdated(self, sender, updated):
self.atrWin.Add(updated)
#region imports
from AlgorithmImports import *
#endregion
from QuantConnect.Algorithm.Framework.Risk import RiskManagementModel
# bracket risk model class
class BracketRiskModel(RiskManagementModel):
'''Creates a trailing stop loss for the maximumDrawdownPercent value and a profit taker for the maximumUnrealizedProfitPercent value'''
def __init__(self, maximumDrawdownPercent = 0.05, maximumUnrealizedProfitPercent = 0.05):
self.maximumDrawdownPercent = -abs(maximumDrawdownPercent)
self.trailingHighs = dict()
self.maximumUnrealizedProfitPercent = abs(maximumUnrealizedProfitPercent)
def ManageRisk(self, algorithm, targets):
riskAdjustedTargets = list()
for kvp in algorithm.Securities:
symbol = kvp.Key
security = kvp.Value
# Remove if not invested
if not security.Invested:
self.trailingHighs.pop(symbol, None)
continue
pnl = security.Holdings.UnrealizedProfitPercent
if pnl > self.maximumUnrealizedProfitPercent:
# liquidate
algorithm.Debug(f"Profit Taken: {security.Symbol}")
algorithm.Log(f"Profit Taken: {security.Symbol}")
riskAdjustedTargets.append(PortfolioTarget(security.Symbol, 0))
return riskAdjustedTargets
# Add newly invested securities
if symbol not in self.trailingHighs:
self.trailingHighs[symbol] = security.Holdings.AveragePrice # Set to average holding cost
continue
# Check for new highs and update - set to tradebar high
if self.trailingHighs[symbol] < security.High:
self.trailingHighs[symbol] = security.High
continue
# Check for securities past the drawdown limit
securityHigh = self.trailingHighs[symbol]
drawdown = (security.Low / securityHigh) - 1
if drawdown < self.maximumDrawdownPercent:
# liquidate
algorithm.Debug(f"Losses Taken: {security.Symbol}")
algorithm.Log(f"Losses Taken: {security.Symbol}")
riskAdjustedTargets.append(PortfolioTarget(symbol, 0))
return riskAdjustedTargets