| Overall Statistics |
|
Total Trades 6134 Average Win 0.26% Average Loss -0.21% Compounding Annual Return 98.871% Drawdown 16.500% Expectancy 0.108 Net Profit 98.871% Sharpe Ratio 2.818 Probabilistic Sharpe Ratio 92.332% Loss Rate 49% Win Rate 51% Profit-Loss Ratio 1.19 Alpha 0.704 Beta -0.256 Annual Standard Deviation 0.232 Annual Variance 0.054 Information Ratio 1.702 Tracking Error 0.267 Treynor Ratio -2.546 Total Fees $15815.07 Estimated Strategy Capacity $24000000.00 Lowest Capacity Asset CDW VHRARJ4RLSV9 |
"""
Library of indicators
@version: 0.11
"""
import pandas as pd
def filter_bars(bars, start, end):
time_idx = bars.index.get_level_values("time")
return bars.iloc[time_idx.indexer_between_time(start, end)]
def rename(bars, name):
return bars.rename(name) if isinstance(bars, pd.Series) \
else bars.add_prefix(f"{name}_")
def get_daygrouper():
return [pd.Grouper(level="symbol"), pd.Grouper(level="time", freq="1D")]
# Daily indicators
def roll_max(bars, window, groupby="symbol"):
groups = bars.groupby(groupby)
output = groups.apply(lambda x: x.rolling(window, min_periods=1).max())
return output
def roll_min(bars, window, groupby="symbol"):
groups = bars.groupby(groupby)
return groups.apply(lambda x: x.rolling(window).min())
def roll_average(bars, window, groupby="symbol", mean_type="arit"):
mean_func = (lambda x: x.ewm(span=window).mean()) if mean_type=="exp" \
else (lambda x: x.rolling(window).mean())
return bars.groupby(groupby).apply(mean_func)
def roll_range(bars, window):
max_high = roll_max(bars["high"], window).squeeze()
min_low = roll_min(bars["low"], window).squeeze()
avg_close = roll_average(bars["close"], window).squeeze()
return (avg_close-min_low)/(max_high-min_low)
def roll_change(bars, window):
return bars.groupby("symbol").pct_change(window)
def position_range(bars, window):
yesterday_bars = bars.groupby("symbol").shift(1) # Not including trading date
max_high = roll_max(yesterday_bars["high"], window).squeeze()
min_low = roll_min(yesterday_bars["low"], window).squeeze()
return (bars["open"]-min_low)/(max_high-min_low)
def gap(bars):
yesterday_bars = bars.groupby("symbol").shift(1) # Not including trading date
return bars["open"]/yesterday_bars["close"]-1
def extension(bars, window):
max_high = roll_max(bars["high"], window).squeeze()
min_low = roll_max(bars["low"], window).squeeze()
return (bars["high"]-max_high)/(max_high-min_low)
def retracement(bars, window):
max_high = roll_max(bars["high"], window).squeeze()
min_low = roll_max(bars["low"], window).squeeze()
return (max_high-bars["low"])/(max_high-min_low)
def gap_extension(bars):
yesterday_bars = bars.groupby("symbol").shift(1) # Not including trading date
return (yesterday_bars["high"]-bars["open"])/(bars["open"]-yesterday_bars["close"])
def day_range(bars):
return bars.eval("(open-low)/(high-low)")
def gap_retracement(bars):
yesterday_bars = bars.groupby("symbol").shift(1) # Not including trading date
return (bars["open"]-yesterday_bars["low"])/(bars["open"]-yesterday_bars["close"])
def roll_vwap(bars, window):
price_volume = bars[["high","low","close"]].mean(axis=1)*bars["volume"]
avg_price_volume = price_volume.groupby("symbol").apply(lambda x: x.rolling(window, min_periods=1).sum())
avg_volume = bars["volume"].groupby("symbol").apply(lambda x: x.rolling(window, min_periods=1).sum())
return avg_price_volume/avg_volume
def shift(bars, shift):
return bars.groupby("symbol").shift(shift)
def divergence(num_bars, den_bars):
return num_bars/den_bars-1
# Intra day indicators
def intra_change(bars):
grouper = bars.groupby(get_daygrouper())
return grouper.last()/grouper.first()-1
def intra_vwap(bars):
price_volume = bars.eval("(high + low + close)/3 * volume")
price_volume = price_volume.groupby("symbol").cumsum()
volume = bars["volume"].groupby("symbol").cumsum()
return price_volume/volume
def intra_average(bars):
return bars.groupby(get_daygrouper()).average()
def intra_max(bars):
return bars.groupby(get_daygrouper()).max()
def intra_min(bars):
return bars.groupby(get_daygrouper()).min()
def intra_gapext(daily_bars, intra_bars): # Gap Extension
numerator = intra_max(intra_bars["high"])-daily_bars["open"]
denominator = daily_bars["open"] - daily_bars["close"].groupby("symbol").shift(1)
return numerator.divide(denominator, axis="index")
def intra_highext(daily_bars, intra_bars): # Total High Extension
intra_high = intra_max(intra_bars["high"])
intra_low = intra_min(intra_bars["low"])
return (daily_bars["high"]-intra_high).divide(intra_high-intra_low,
axis="index")
def intra_retrace(bars): # Retrace
grouper = bars.groupby(get_daygrouper())
start_bars = grouper.first()
end_bars = grouper.last()
return (end_bars["high"]-start_bars["high"])/(start_bars["high"]-start_bars["low"])
def intra_divup(bars): # Divergence Up
vwap = intra_vwap(bars)
return (bars["high"] - vwap) / vwap
def intra_divdown(bars): # Divergence Down
vwap = intra_vwap(bars)
return (vwap - bars["low"]) / vwap
def intra_position_range(bars): # Posin Range
grouper = bars.groupby(get_daygrouper())
return (grouper["close"].last()-grouper["low"].min())/(grouper["high"].max()-grouper["low"].min())
def intra_relvolume(daily_bars, intra_bars, avg_days=10):
grouper = intra_bars.groupby(get_daygrouper())
intra_volume = grouper["volume"].sum()
avg_volume = shift(roll_average(daily_bars["volume"], avg_days), 1) # Shift 1 day later to match with intra-day data
return intra_volume/avg_volume.squeeze()
def intra_volume_hod(bars):
grouper = bars.groupby(get_daygrouper())
index = grouper.apply(lambda x: x.idxmax()[1])
return grouper["volume"].cumsum()[index].groupby(get_daygrouper()).last()"""
Big Bertha Strategy with Machine Learning
Done
- Custom precision scoring
- New Features (bb volume and open)
- Offline data storage to avoid symbols limitation
- Trade execution on high probability trades
- Double barrier target with SL
@version: 0.19
@creation date: 05/07/2022
"""
from AlgorithmImports import *
import numpy as np
import pandas as pd
from ast import literal_eval
from sklearn.model_selection import cross_val_score
from sklearn.ensemble import GradientBoostingClassifier
import indicators as idx
from timeseriescv import TimeSeriesSplitGroups
pd.set_option('mode.use_inf_as_na', True)
GROUPER = [pd.Grouper(level="symbol"), pd.Grouper(level="time", freq="1D")]
AGG_OPS = {"open": "first", "close": "last", "high": "max",
"low": "min", "volume": "sum"}
class BigBerthaML(QCAlgorithm):
def Initialize(self):
self.min_usd_volume = literal_eval(self.GetParameter("min_usd_volume"))
self.retracement_sl = literal_eval(self.GetParameter("retracement_sl"))
self.kelly_frac = literal_eval(self.GetParameter("kelly_frac"))
self.capital = literal_eval(self.GetParameter("capital"))
self.use_sl = literal_eval(self.GetParameter("use_sl"))
self.exposure = self.GetParameter("exposure")
self.benchmark = self.GetParameter("benchmark")
self.SetStartDate(2021, 1, 1)
self.SetEndDate(2022, 1, 1)
self.SetCash(self.capital)
self.UniverseSettings.Resolution = Resolution.Minute
self.UniverseSettings.ExtendedMarketHours = True
self.AddUniverse(self.coarse_filter)
self.AddEquity(self.benchmark, Resolution.Minute)
self.SetBenchmark(self.benchmark)
self.score = 0
self.cv = TimeSeriesSplitGroups(n_splits=10)
self.features, self.targets = None, None
self.model = GradientBoostingClassifier(n_iter_no_change=3)
at = self.TimeRules.At
every_day = self.DateRules.EveryDay(self.benchmark)
self.Train(self.DateRules.WeekStart(), at(0, 0), self.train_model)
self.Schedule.On(every_day, at(9, 35), self.store_features)
self.Schedule.On(every_day, at(9, 35), self.trade)
self.Schedule.On(every_day, at(15, 55), self.stop_trading)
self.Schedule.On(every_day, at(15, 55), self.store_targets)
def coarse_filter(self, coarse):
return [x.Symbol for x in coarse if
x.HasFundamentalData and
x.DollarVolume > self.min_usd_volume]
def train_model(self):
if self.features is None: return
training_days = self.features.index.get_level_values("time").unique()
if len(training_days) < 20: return
time_groups = self.targets.index.get_level_values("time")
cv_scores = cross_val_score(self.model, X=self.features, y=self.targets,
cv=self.cv, groups=time_groups,
scoring="balanced_accuracy")
self.score = np.mean(np.nan_to_num(cv_scores, 0))
self.model.fit(self.features, self.targets)
self.print(f"Training: {self.targets.value_counts()} Score:{self.score:.1%}")
self.Plot("ML", "Score", self.score)
def trade(self):
if self.score == 0: return
n_classes = len(self.model.classes_)
edge = (n_classes * self.score - 1) / (n_classes-1)
x_pred = self.features.query("time == @self.Time.date()")
x_pred.index = x_pred.index.droplevel("time")
y_pred = pd.Series(self.model.predict(x_pred), index=x_pred.index)
positions = y_pred * (edge * self.kelly_frac).clip(0, 1)
if sum(abs(positions)) > 1: positions /= sum(abs(positions)) # Ensuring no leverage is used
self.print(f"Trading: {y_pred.value_counts()}")
for symbol, pos in positions[positions != 0].items():
qty = self.CalculateOrderQuantity(symbol, pos)
self.MarketOrder(symbol, qty)
if self.use_sl:
features = x_pred.loc[symbol]
window = (features.bb_high - features.bb_low) * self.retracement_sl
stop_loss = features.bb_high - window if pos > 0 \
else features.bb_low + window # TODO: Refactor
self.StopLimitOrder(symbol, -qty, stop_loss, stop_loss)
def stop_trading(self):
self.Transactions.CancelOpenOrders()
self.Liquidate()
def store_features(self):
start = self.Time.replace(hour=7, minute=1, second=0)
tickers = list(self.ActiveSecurities.Keys)
minute_bars = self.History(tickers, start, self.Time, Resolution.Minute)
pm_bar = agg_bars(minute_bars, "07:01", "09:30")
min5_bar = agg_bars(minute_bars, "09:31", "09:35")
new_features = min5_bar.add_prefix("bb_")
new_features.eval("bb_size = (bb_high-bb_low)/bb_open", inplace=True)
new_features.eval("bb_close_range = (bb_close-bb_low)/(bb_high-bb_low)", inplace=True)
new_features.eval("bb_open_range = (bb_open-bb_low)/(bb_high-bb_low)", inplace=True)
new_features["pm_volume_usd"] = pm_bar.eval("close * volume")
yesterday_bar = self.History(tickers, 1, Resolution.Daily)
yesterday_close = yesterday_bar["close"].droplevel("time")
new_features["gap"] = min5_bar["open"] / yesterday_close - 1
self.features = pd.concat([new_features.dropna(), self.features])
self.print(f"Stored new features, total: {len(self.features)}")
def store_targets(self):
last_features = self.features.query("time == @self.Time.date()")
tickers = list(last_features.index.get_level_values("symbol"))
start = self.Time.replace(hour=9, minute=31, second=0)
end = self.Time.replace(hour=15, minute=55, second=0)
minute_bars = self.History(tickers, start, end, Resolution.Minute)
min5_bar = agg_bars(minute_bars, "09:31", "09:35")
trading_bar = agg_bars(minute_bars, "09:36", "15:55")
trading_bar = trading_bar.join(min5_bar.add_prefix("bb_"))
new_targets = trading_bar.apply(self.calc_exit_target, axis=1)
self.targets = pd.concat([new_targets.dropna(), self.targets])
self.print(f"Stored new targets, total: {len(self.targets)}")
def calc_exit_target(self, price_bar):
window = (price_bar.bb_high - price_bar.bb_low)
if price_bar.close > price_bar.open * 1.01 and "long" in self.exposure: # long trade
if self.use_sl:
stop_loss = price_bar.bb_high - window * self.retracement_sl
return +1 if price_bar.low > stop_loss else 0 # 1 if profitable long and not touching the SL
else:
return +1
elif price_bar.close < price_bar.open * 0.99 and "short" in self.exposure: # short trade
if self.use_sl:
stop_loss = price_bar.bb_low + window * self.retracement_sl
return -1 if price_bar.high < stop_loss else 0 # -1 if profitable short and not touching the SL
else:
return -1
else:
return 0
def print(self, msg):
self.Debug(f"{self.Time} {msg}")
def agg_bars(minute_bars, start_time, end_time):
filtered_bars = idx.filter_bars(minute_bars, start_time, end_time)
return filtered_bars.groupby(GROUPER).agg(AGG_OPS)
import math
import numpy as np
from math import factorial
from itertools import combinations
from sklearn.model_selection._split import _BaseKFold, indexable
class TimeSeriesSplitGroups(_BaseKFold):
def __init__(self, n_splits=5, purge_groups=0):
super().__init__(n_splits, shuffle=False, random_state=None)
self.purge_groups = purge_groups
def split(self, X, y=None, groups=None):
X, y, groups = indexable(X, y, groups)
n_folds = self.n_splits + 1
group_list = np.unique(groups)
n_groups = len(group_list)
if n_folds + self.purge_groups > n_groups:
raise ValueError((f"Cannot have number of folds plus purged groups "
f"={n_folds+self.purge_groups} greater than the "
f"number of groups: {n_groups}."))
test_size = (n_groups-self.purge_groups) // n_folds
test_starts = [n_groups-test_size*c for c in range(1, n_folds)]
for tstart in test_starts:
train_idx = np.isin(groups, group_list[:tstart - self.purge_groups])
test_idx = np.isin(groups, group_list[tstart:tstart + test_size])
yield (np.nonzero(train_idx)[0], np.nonzero(test_idx)[0])
class CombinatorialPurgedCV(_BaseKFold):
def __init__(self, n=4, k=2, purge=0, embargo=0):
self.n = n
self.k = k
self.purge = purge
self.embargo = embargo
n_splits = int(factorial(n)/(factorial(k)*factorial(n-k)))
super().__init__(n_splits, shuffle=False, random_state=None)
def split(self, X, y=None, groups=None):
X, y, groups = indexable(X, y, groups)
unique_groups = list(np.unique(groups))
required_folds = self.n_splits
if required_folds > len(unique_groups):
raise ValueError((f"Required folds ={required_folds} greater than "
f"the number of groups: {len(unique_groups)}."))
fold_size = int(math.ceil(len(unique_groups) / self.n))
test_folds = combinations(range(self.n), self.k)
for test_fold in test_folds:
train_groups, test_groups = [], []
for c in range(self.n):
start = c * fold_size
stop = min((c + 1) * fold_size, len(unique_groups)) # To avoid going out of bound
if c in test_fold:
test_groups += unique_groups[start:stop]
else:
# Naive fold sizing, should be distributed before train/test split
if c-1 in test_fold: start += self.embargo
if c+1 in test_fold: stop -= (self.purge+self.embargo)
train_groups += unique_groups[start:stop]
train_idx = np.nonzero(np.isin(groups, train_groups))[0]
test_idx = np.nonzero(np.isin(groups, test_groups))[0]
yield train_idx, test_idx