| Overall Statistics |
|
Total Trades 318 Average Win 0.77% Average Loss -0.83% Compounding Annual Return 48.848% Drawdown 11.500% Expectancy 0.395 Net Profit 64.724% Sharpe Ratio 2.619 Probabilistic Sharpe Ratio 93.832% Loss Rate 28% Win Rate 72% Profit-Loss Ratio 0.93 Alpha 0.324 Beta 0.027 Annual Standard Deviation 0.124 Annual Variance 0.015 Information Ratio 1.671 Tracking Error 0.199 Treynor Ratio 11.788 Total Fees $867.33 Estimated Strategy Capacity $5200000.00 Lowest Capacity Asset BWV XW3T2OYN5MJP |
"""
Big Bertha Strategy with Machine Learning
Last changes:
v0.27: Minimum probability parameter
v0.26: Added both TP and SL capabilities (naive triple barrier targets)
v0.25: Individual probability-based sizing (in addition to general Kelly sizing)
v0.24: Offline model storage
v0.23: Lookback parameter
@version: 0.27
@creation date: 05/07/2022
"""
from AlgorithmImports import *
import pickle
import numpy as np
import pandas as pd
from sklearn.model_selection import cross_val_score
from sklearn.ensemble import GradientBoostingClassifier
import indicators as idx
from timeseriescv import TimeSeriesSplitGroups
pd.set_option('mode.use_inf_as_na', True)
GROUPER = [pd.Grouper(level="symbol"), pd.Grouper(level="time", freq="1D")]
AGG_OPS = {"open": "first", "close": "last", "high": "max",
"low": "min", "volume": "sum"}
def catch_errors(func):
def wrap(self, *args, **kwargs):
try:
result = func(self, *args, **kwargs)
return result
except (KeyError, ValueError) as e:
self.print(e)
return
return wrap
class BigBerthaML(QCAlgorithm):
def Initialize(self):
self.min_usd_volume = self.GetParameter("min_usd_volume", 1e9) # Minimum trading volume in previous trading day
self.target_gain = self.GetParameter("target_gain", 0.05) # Minimum target gain to enter the trade
self.capital = self.GetParameter("capital", 80000) # Starting capital
self.lookback = self.GetParameter("lookback", 365) # Trading days used for model training
self.strategy = self.GetParameter("strategy", 0) # -1 short only, +1 long only, 0 long/short
self.benchmark = self.GetParameter("benchmark", "SPY") # Performance benchmark
self.cv_splits = self.GetParameter("cv_splits", 10) # Number of splits for model cross validation
self.store_model = self.GetParameter("store_model", None) # Model name if it needs to be stored
self.sl_retr = self.GetParameter("retracement_sl", 0) # Retracement percentage to use for the Stop Loss, disabled if 0
self.tp_ext = self.GetParameter("extension_tp", 0) # Extension percentage to use for the Take Profit, disabled if 0
self.SetStartDate(2021, 6, 1)
self.SetEndDate(2022, 9, 1)
self.SetCash(self.capital)
self.UniverseSettings.Resolution = Resolution.Minute
self.UniverseSettings.ExtendedMarketHours = True
self.AddUniverse(self.coarse_filter)
self.AddEquity(self.benchmark, Resolution.Minute)
self.SetBenchmark(self.benchmark)
#self.ObjectStore.Delete(self.store_model) # Deleting existing data
if self.store_model is not None and self.ObjectStore.ContainsKey(self.store_model):
self.model = pickle.loads(bytes(self.ObjectStore.ReadBytes(self.store_model)))
else:
self.model = GradientBoostingClassifier(n_iter_no_change=10)
self.model.edge = 0
self.cv = TimeSeriesSplitGroups(n_splits=self.cv_splits)
self.features, self.targets = None, None
at = self.TimeRules.At
every_day = self.DateRules.EveryDay(self.benchmark)
self.Train(self.DateRules.WeekStart(), at(0, 0), self.train_model)
self.Schedule.On(every_day, at(9, 35), self.enter_trades)
self.Schedule.On(every_day, at(15, 55), self.exit_trades)
def coarse_filter(self, coarse):
return [x.Symbol for x in coarse if
x.HasFundamentalData and
x.DollarVolume > self.min_usd_volume]
def train_model(self):
if self.features is None or self.targets is None: return
idx = self.features.index.intersection(self.targets.index) # Removing features without matching targets
idx = idx[idx.get_level_values("time") > self.Time - timedelta(self.lookback)]
self.features = self.features.loc[idx]
self.targets = self.targets.loc[idx]
training_days = idx.get_level_values("time")
if len(training_days.unique()) <= 21: return # Require more than one month of training data
cv_scores = cross_val_score(self.model, X=self.features, y=self.targets,
cv=self.cv, groups=training_days,
scoring="balanced_accuracy")
self.model.fit(self.features, self.targets)
if self.store_model is not None:
self.ObjectStore.SaveBytes(self.store_model, pickle.dumps(self.model))
score = np.mean(cv_scores)
n_classes = len(self.model.classes_)
self.model.edge = (n_classes * score - 1) / (n_classes - 1) # Kelly edge calculation with multiple classes
self.print(f"Training: {self.targets.value_counts()} Edge:{self.model.edge:.1%}")
self.Plot("ML", "Edge", self.model.edge)
def enter_trades(self):
self.store_features()
if self.model.edge <= 0: return
x_pred = self.features.query("time == @self.Time.date()")
x_pred.index = x_pred.index.droplevel("time")
y_proba = pd.DataFrame(self.model.predict_proba(x_pred),
index=x_pred.index,
columns=self.model.classes_)
y_pred = y_proba.idxmax(axis=1)
sizes = (y_proba.max(axis=1) - 0.5).clip(0, 1) * 2 # Selecting only prob > 50% and scaling to 100%
positions = y_pred * sizes * self.model.edge # Sizing based on Kelly and individual probabilty
if sum(abs(positions)) > 1: positions /= sum(abs(positions)) # Ensuring no leverage is used
self.print(f"Trading: {y_pred.value_counts()}")
for symbol, pos in positions[positions != 0].items():
qty = self.CalculateOrderQuantity(symbol, pos)
self.MarketOrder(symbol, qty)
features = x_pred.loc[symbol]
window = (features.bb_high - features.bb_low)
if self.sl_retr > 0:
stop_loss = features.bb_high - window * self.sl_retr if pos > 0 \
else features.bb_low + window * self.sl_retr
self.StopLimitOrder(symbol, -qty, stop_loss, stop_loss)
if self.tp_ext > 0:
take_profit = features.bb_low + window * self.tp_ext if pos > 0 \
else features.bb_high - window * self.tp_ext
self.LimitOrder(symbol, -qty, take_profit)
def exit_trades(self):
self.Transactions.CancelOpenOrders()
self.Liquidate()
self.store_targets()
@catch_errors
def store_features(self):
start = self.Time.replace(hour=7, minute=1, second=0, microsecond=0)
tickers = list(self.ActiveSecurities.Keys)
last_minute = self.Time.replace(second=0, microsecond=0)
minute_bars = self.History(tickers, start, last_minute, Resolution.Minute)
pm_bar = agg_bars(minute_bars, "07:01", "09:30")
entry_hr, entry_mn = last_minute.hour, last_minute.minute
bertha_bar = agg_bars(minute_bars, "09:31", f"{entry_hr}:{entry_mn}")
new_features = bertha_bar.add_prefix("bb_")
new_features.eval("bb_size = (bb_high-bb_low)/bb_open", inplace=True)
new_features.eval("bb_close_range = (bb_close-bb_low)/(bb_high-bb_low)", inplace=True)
new_features.eval("bb_open_range = (bb_open-bb_low)/(bb_high-bb_low)", inplace=True)
new_features["pm_volume_usd"] = pm_bar.eval("close * volume")
yesterday_bar = self.History(tickers, 1, Resolution.Daily)
yesterday_close = yesterday_bar["close"].droplevel("time")
new_features["gap"] = bertha_bar["open"] / yesterday_close - 1
self.features = pd.concat([new_features.dropna(), self.features])
self.print(f"Stored new features, total: {len(self.features)}")
@catch_errors
def store_targets(self):
last_features = self.features.query("time == @self.Time.date()")
self.Log(last_features)
tickers = list(last_features.index.get_level_values("symbol"))
last_minute = self.Time.replace(second=0, microsecond=0)
self.Log(f"Target time: {last_minute}")
minute_bars = self.History(tickers, last_minute - timedelta(minutes=1),
last_minute, Resolution.Minute)
self.Log(minute_bars)
trading_bar = minute_bars.droplevel("time").join(last_features)
new_targets = trading_bar.apply(self.calc_target, axis=1)
self.targets = pd.concat([new_targets.dropna(), self.targets])
self.print(f"Stored new targets, total: {len(self.targets)}")
def calc_target(self, price_bar):
entry_price, exit_price = price_bar.bb_close, price_bar.close
window = (price_bar.bb_high - price_bar.bb_low)
if exit_price >= entry_price * (1 + self.target_gain) and self.strategy >= 0: # long trade
if self.sl_retr > 0:
stop_loss = price_bar.bb_high - window * self.sl_retr
return +1 if price_bar.low > stop_loss else 0 # 1 if profitable long and not touching the SL
else:
return +1
elif exit_price <= entry_price * (1 - self.target_gain) and self.strategy <= 0: # short trade
if self.sl_retr > 0:
stop_loss = price_bar.bb_low + window * self.sl_retr
return -1 if price_bar.high < stop_loss else 0 # -1 if profitable short and not touching the SL
else:
return -1
else:
return 0
def print(self, msg):
self.Debug(f"{self.Time} {msg}")
def agg_bars(minute_bars, start_time, end_time):
filtered_bars = idx.filter_bars(minute_bars, start_time, end_time)
return filtered_bars.groupby(GROUPER).agg(AGG_OPS)