| Overall Statistics |
|
Total Trades 13 Average Win 37.12% Average Loss -0.22% Compounding Annual Return 315.468% Drawdown 13.800% Expectancy 55.167 Net Profit 82.111% Sharpe Ratio 7.429 Probabilistic Sharpe Ratio 99.903% Loss Rate 67% Win Rate 33% Profit-Loss Ratio 167.50 Alpha 0 Beta 0 Annual Standard Deviation 0.497 Annual Variance 0.247 Information Ratio 7.429 Tracking Error 0.497 Treynor Ratio 0 Total Fees $13.00 Estimated Strategy Capacity $110000000.00 Lowest Capacity Asset GME SC72NCBXXAHX |
import pandas as pd
from io import StringIO
class Dropbox:
def __init__(self, context):
self.context = context
self.current_universe = []
self.live_url = "https://www.dropbox.com/s/yyx5dhlfurffkyw/cur_ment.csv?dl=1"
self.backtest_url = "https://www.dropbox.com/s/015ej53p1w7xgj2/backtest_debug-Sheet1-3.csv?dl=1"
self.backtest_df = None
self.df = None
self.num_mentions_include = 500
self.num_mentions_dropout = 100
self.timer_threshold = timedelta(days=14)
def get_tickers(self):
df = self.get_df()
if df is None:
tickers = self.check_timers()
self.current_universe = tickers
return tickers
tickers = self.filter_tickers(df)
self.current_universe = tickers
self.df = df
return tickers
def check_timers(self):
universe = []
for ticker in self.current_universe:
data = self.context.data[ticker]
if data.timer is True:
self.context.Debug(f"{ticker} is being checked @ {self.context.Time}, timer_exp: {data.timer_exp}")
if self.context.Time < data.timer_exp:
universe.append(ticker)
else:
self.context.Debug(f"Timer has expired for {ticker}, removing from Universe")
continue
else:
universe.append(ticker)
return universe
def get_df(self):
if self.context.LiveMode:
try:
url = self.live_url
csv = self.context.Download(url)
self.context.Debug(f"csv: {csv}")
df = pd.read_csv(StringIO(csv))
self.context.Debug(f"df: {df}")
df.columns = ['ticker', 'current_mentions']
df.set_index('ticker', inplace=True)
self.context.Debug(f"Returning DF: {df}")
return df
except:
self.context.Debug("-- CRITICAL -- Could NOT get csv in LiveMode")
return None
if self.backtest_df is not None:
row = self.get_row()
return row
url = self.backtest_url
csv = self.context.Download(url)
df = pd.read_csv(StringIO(csv))
cols = df.columns.values
cols[0] = 'datetime'
df.columns = cols
df.set_index('datetime', inplace=True)
self.backtest_df = df
row = self.get_row()
return row
def get_row(self):
# in live mode we are calling at t+1, therefore in backtesting we need to get the datetime of t-1 in order to pull it from the rows.
dt = self.context.Time - timedelta(minutes=self.context.check_buffer)
dt = dt.strftime('%Y-%m-%d %H:%M:00')
self.context.Debug(f"Trying to locate dt: {dt}")
try:
row = self.backtest_df.loc[[dt]]
except:
self.context.Debug(f"Could not find row {dt} in df")
return None
df = row.transpose()
df.index.rename('ticker', inplace=True)
df.columns = ['current_mentions']
self.context.Debug(f"transposed df: \n{df}")
return df
def filter_tickers(self, df):
'''
filter_tickers takes the current tickers and tickers existing in the universe already and evaluates the inclusion in the new (updated) universe.
Arguments:
df: pd.DF of mentions indexed by ticker
Returns:
list of tickers that should be included in the new (updated) universe.
'''
universe = []
ticker_list = [ ticker for ticker in df.index ]
for ticker in ticker_list:
mentions = df.loc[ticker, 'current_mentions']
if ticker in self.current_universe:
data = self.context.data[ticker]
if mentions > self.num_mentions_dropout:
if data.timer is False:
# ticker has not fallen below dropout threshold yet. Keep it.
universe.append(ticker)
else:
# ticker has previously triggered dropout but is now back above the dropout threshold.
if mentions > self.num_mentions_include:
# ticker has enough mentions to include and reset the timer. Keep it.
data.reset_timer()
universe.append(ticker)
else:
# ticker is above dropout threshold but not yet at the include threshold.
self.context.Debug(f"{ticker} is being checked @ {self.context.Time}, timer_exp: {data.timer_exp}")
if self.context.Time < data.timer_exp:
# ticker has not yet triggered timer threshold. Keep it.
universe.append(ticker)
else:
if data.in_trade is True:
self.context.Debug(f"{ticker} has expired but is in a trade, keeping in Universe")
universe.append(ticker)
else:
# ticker is above the dropout threshold, but has not met the include threshold and has triggered the timer threshold. Remove it.
self.context.Debug(f"Timer has expired for {ticker}, removing from Universe")
continue
else:
# ticker is below dropout threshold.
if data.timer is False:
# start the timer. Keep it.
data.start_timer(self.timer_threshold)
universe.append(ticker)
else:
self.context.Debug(f"{ticker} is being checked @ {self.context.Time}, timer_exp: {data.timer_exp}")
if self.context.Time < data.timer_exp:
# ticker is below dropout threshold but has not triggered timer threshold. Keep it.
universe.append(ticker)
else:
if data.in_trade is True:
universe.append(ticker)
else:
# ticker is below dropout threshold and has triggered timer threshold. Remove it.
self.context.Debug(f"Timer has expired for {ticker}, removing from Universe")
continue
else:
if mentions > self.num_mentions_include:
universe.append(ticker)
# get 'other tickers' that are in the current universe but have not been included in the csv this iteration
other_tickers = [ ticker for ticker in self.current_universe if ticker not in ticker_list ]
for ticker in other_tickers:
data = self.context.data[ticker]
if data.timer is False:
# ticker previously was in universe but had not triggered dropout.
data.start_timer(self.timer_threshold)
universe.append(ticker)
else:
# ticker previously was in universe and had triggered dropout.
self.context.Debug(f"{ticker} is being checked @ {self.context.Time}, timer_exp: {data.timer_exp}")
if self.context.Time < data.timer_exp:
universe.append(ticker)
else:
self.context.Debug(f"Timer has expired for {ticker}, removing from Universe")
continue
return universefrom SymbolData import SymbolData
from MyModels import MyKMeansModel
from Dropbox import Dropbox
import math
import pandas as pd
from io import StringIO
import pickle
class OptimizedParticleEngine(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2021, 1, 12) # Set Start Date
# self.SetEndDate(2021, 6, 15) # Set End Date
self.SetCash(10000)
self.SetBrokerageModel( InteractiveBrokersBrokerageModel() )
self.SetSecurityInitializer(lambda x: x.SetDataNormalizationMode(DataNormalizationMode.Raw))
self.base_res = Resolution.Minute
self.data = {}
self.models = {}
self.use_dropbox = False
self.dropbox = Dropbox(self)
self.UniverseSettings.Resolution = self.base_res
self.UniverseSettings.MinimumTimeInUniverse = TimeSpan.Zero
self.AddUniverse("dropbox", Resolution.Minute, self.get_universe)
self.dropbox_minutes = [0, 15, 30, 45]
self.check_buffer = 1
self.dropbox_check_minutes = [minute+self.check_buffer for minute in self.dropbox_minutes]
# self.reloaded = False
# self.current_universe = []
# if self.ObjectStore.ContainsKey("current_universe"):
# self.reload_universe()
# self.reload_dict = None
# self.reloaded = True
### PARAMS
self.risk_percent = 0.05
self.check_gap_opens = True
self.gap_close_percent = 100
self.previous = None
self.price_cutoff = 200
self.marketcap_cutoff = 1e10 # $10b=largecaps
self.num_largest = 25
self.threshold_mult = 3
self.can_trade_again = True
self.check_for_retraining = True
def reload_universe(self):
self.Debug(f"Reloading Universe from StoredObjects")
deserialized = bytes(self.ObjectStore.ReadBytes("current_universe"))
# self.Debug(f"Deserialized: {deserialized}")
universe_dict = pickle.loads(deserialized)
self.reload_dict = universe_dict
self.current_universe = list(universe_dict.keys())
self.Debug(f"Previous Current Universe Reloaded: {self.reload_dict}, self.current_universe: {self.current_universe}")
def get_universe(self, date):
if self.use_dropbox is False:
all_tickers = ['GME', 'AMC', 'BB', 'RKT', 'TSLA', 'SPCE', 'PLTR', 'SNDL', 'NOK', 'MVIS', 'BABA', 'NVDA', 'AMD', 'CLOV']
one = ['GME']
tickers = one#all_tickers
return tickers
if date.minute not in self.dropbox_check_minutes:
return self.current_universe
self.Debug(f"Checking Dropbox for new Universe @ {self.Time}")
tickers = self.dropbox.get_tickers()
self.current_universe = tickers
self.Debug(f"Returning Universe: {self.current_universe}")
# fresh = [ticker for ticker in tickers if ticker not in self.current_universe]
# for ticker in fresh:
# self.data[ticker] = None
# current_data = {ticker:self.data[ticker] for ticker in tickers if ticker in self.data.keys()}
# current_data = {ticker: None for ticker in tickers}
current_data = {ticker: self.data[ticker].timer_exp if ticker in self.data else None for ticker in tickers}
serialized = pickle.dumps(current_data)
self.ObjectStore.SaveBytes("current_universe", serialized)
return tickers
def evaluate(self, symbol, bar):
ticker = symbol.Value
self.Debug(f"Evaluating Data for {ticker}")
if ticker not in self.data:
return
data = self.data[ticker]
if not data.IsReady():
self.Debug(f"{self.Time}: {ticker} data is not ready!")
return
good_price = bar.Close < self.price_cutoff
if not good_price:
# self.Debug(f"Refusing to trade {ticker} @ {bar.Close}")
return
# if self.models[ticker] is None:
# self.Debug(f"{ticker} Model is not trained, training model.")
# self.train_model(ticker)
# # trained model and threshold saved in symboldata
# df = data.build_df(live=True)
# if df is None:
# self.Debug(f"build_df returned None")
# return
# model = self.models[ticker]
# features = model.get_features(df, live=True)
# distances = model.model_transform(features)
# current = distances.values[-1]
# threshold = data.threshold
# get indicator values
upper_channel_prev = data.channel.upper_history[1]
lower_channel_prev = data.channel.upper_history[1]
mid_channel = data.channel.mid_history[0]
st_fifteen = data.supertrend_fifteen.Value
st_fifteen_prev = data.supertrend_fifteen.val_window[1]
st_hour = data.supertrend_hour.Value
st_hour_prev = data.supertrend_hour.val_window[1]
ema_fast = data.ema_fast.Current.Value
ema_slow = data.ema_slow.Current.Value
# get bar values
hour_bar = data.hour_bars[0]
# define conditions
# anomaly = current > threshold
long_break = bar.Close > upper_channel_prev
short_break = bar.Close < lower_channel_prev
# up_diff = bar.Close > data.bars[1].Close
# long_environment = ema_fast > ema_slow
can_trade = data.can_trade
go_long = long_break and can_trade
exit_trigger = hour_bar.Close < st_hour
exit_long = exit_trigger
# self.Debug(f"Current Distance: {current}, Threshold: {threshold}, IsAnomaly? {current > threshold}")
# self.Debug(f"ema_fast: {ema_fast}, ema_slow: {ema_slow}")
if self.Portfolio[symbol].Invested:
if data.no_exit > 0:
data.no_exit += -1
self.Debug(f"{data.no_exit}")
if data.no_exit > 0:
exit_long = False
return
if exit_long:
self.Debug(f"-- {ticker} LONG EXIT @ {hour_bar.Close} --")
self.Debug(f"Open: {hour_bar.Open}, High: {hour_bar.High}, Low: {hour_bar.Low}, Close: {hour_bar.Close}, Volume: {hour_bar.Volume}, BarTime: {hour_bar.Time}")
self.Debug(f"supertrend_hour: {st_hour}, supertrend_hour_prev: {st_hour_prev}")
self.Liquidate(symbol, 'Liquidate Position (Exit Long)')
data.in_trade = False
if self.can_trade_again:
data.can_trade = True
# if self.check_for_retraining:
# data.retrain_model = True
if not self.Portfolio[symbol].Invested:
if go_long:
self.Debug(f"-- {ticker} LONG ENTRY @ {bar.Close}")#", Current Distance: {current}, Threshold: {threshold}")
self.Debug(f"Open: {bar.Open}, High: {bar.High}, Low: {bar.Low}, Close: {bar.Close}, Volume: {bar.Volume}, BarTime: {bar.Time}")
self.Debug(f"supertrend_fifteen: {st_fifteen}, supertrend_fifteen_prev: {st_fifteen_prev}")
sl_price = st_fifteen
# sl_price = bar.Close - (bar.Close*0.1)
sl_price = round(sl_price, 2)
quantity = self.calculate_quantity(symbol, bar.Close, sl_price)
entry = self.MarketOrder(symbol, quantity, False, 'Long Entry')
sl = self.StopMarketOrder(symbol, -quantity, sl_price, 'Long SL')
data.in_trade = True
# data.can_trade = False
data.no_exit = 3
def OnData(self, base_data):
'''OnData event is the primary entry point for your algorithm. Each new data point will be pumped in here.
Arguments:
base_data: Slice object keyed by symbol containing the stock data at base_res
'''
if self.check_gap_opens:
if self.previous is not None and self.Time.date() == self.previous.date():
return
for ticker in self.data.keys():
symbol = self.data[ticker].symbol
if self.Portfolio[symbol].Invested:
if not base_data.Bars.ContainsKey(symbol):
self.Debug(f"{ticker} not in OnData slice. Time: {self.Time}")
continue
data = self.data[ticker]
current_open = base_data[symbol].Open
previous_close = data.bars[0].Close
distance = (current_open - previous_close) / previous_close * 100
if distance >= self.gap_close_percent:
self.Liquidate(symbol, "Liquidate Position (Gap Open)")
self.Debug(f"--!! Gap open of: {round(distance, 2)}%, closing position !!--")
self.Debug(f"Last BarTime: {data.bars[0].EndTime}, Close: {previous_close}")
self.Debug(f"Current BarTime: {base_data[symbol].EndTime}, Open: {current_open}")
data.in_trade = False
if self.can_trade_again:
data.can_trade = True
if self.check_for_retraining:
data.retrain_model = True
# if self.check_for_retraining:
# for ticker in self.data.keys():
# data = self.data[ticker]
# if data.retrain_model:
# self.train_model(ticker)
# data.retrain_model = False
# by setting self.previous we only check gaps
self.previous = self.Time
def calculate_quantity(self, symbol, close, sl_value):
equity = self.Portfolio.TotalPortfolioValue
risk = self.risk_percent
conversion = self.Securities[symbol].QuoteCurrency.ConversionRate
# quantity = (equity * risk) / (abs(close - sl_value) * conversion)
quantity = (equity * risk) / close * conversion
# quantity = cash_amount / close * conversion
quantity = math.floor(quantity)
self.Debug(f"Calculated Quantity as: {quantity}")
return quantity
# def train_model(self, ticker):
# self.Debug(f"-- Started Training the Model for {ticker} @ {self.Time}")
# data = self.data[ticker]
# history = data.build_df(training=True)
# self.Debug(f"Got {len(history)} of History for {ticker}")
# kmeans = MyKMeansModel(self, ticker)
# features = kmeans.get_features(history, training=True)
# self.Debug(f"Got {len(features)} Features for {ticker}")
# kmeans.create_model()
# self.Debug(f"{ticker} Model Created")
# kmeans.fit_model(features)
# self.Debug(f"{ticker} Model Fit")
# distances = kmeans.model_transform(features)
# # self.Debug(f"Features Transformed: {len(distance)}")
# num_largest = self.num_largest
# threshold_mult = self.threshold_mult
# threshold = distances.nlargest(num_largest).sum() / num_largest * threshold_mult
# self.data[ticker].threshold = threshold
# self.models[ticker] = kmeans
# self.Debug(f"-- Finished training model for {ticker}, threshold calculated and saved in data: {self.data[ticker].threshold}")
def OnSecuritiesChanged(self, changes):
self.Debug(f"-- OnSecuritiesChanged triggered, added: {len(changes.AddedSecurities)}, removed: {len(changes.RemovedSecurities)}")
for removed in changes.RemovedSecurities:
ticker = removed.Symbol.Value
data = self.data.pop(ticker, None)
if data is not None:
data.remove_consolidators()
model = self.models.pop(ticker, None)
self.Debug(f"{ticker} Removed from Universe in OnSecuritiesChanged")
symbols = [ x.Symbol for x in changes.AddedSecurities ]
history = self.History(symbols, 10000, Resolution.Minute)
if history.empty:
self.Debug(f"-- Finished making changes to the Universe")
self.Debug(f"Active Securities: {[symbol.Value for symbol in self.ActiveSecurities.Keys]}")
self.Debug(f"Securities: {[symbol.Value for symbol in self.Securities.Keys]}")
return
for added in changes.AddedSecurities:
symbol = added.Symbol
ticker = symbol.Value
if ticker not in self.data:
self.data[ticker] = SymbolData(self, symbol)
elif self.data[ticker] is None:
self.data[ticker] = SymbolData(self, symbol)
# self.models[ticker] = None
try:
ticker_history = history.loc[symbol]
except:
self.Debug(f"Cannot add {ticker} to Universe because it is not in History!")
continue
if len(ticker_history) < 5000:
self.Debug(f"Cannot add {ticker} to Universe because it does not have enough History! Len: {len(ticker_history)}")
continue
self.data[ticker].warm_up_history(ticker_history)
self.Debug(f"{ticker} Added to Universe in OnSecuritiesChanged")
# self.train_model(ticker)
self.Debug(f"-- Finished making changes to the Universe")
self.Debug(f"Active Securities: {[symbol.Value for symbol in self.ActiveSecurities.Keys]}")
self.Debug(f"Securities: {[symbol.Value for symbol in self.Securities.Keys]}")
import numpy as np
import pandas as pd
from sklearn.cluster import KMeans
from sklearn.preprocessing import MinMaxScaler
class MyKMeansModel:
def __init__(self, context, symbol):
self.context = context
self.symbol = symbol
self.model = None
self.scaler = None
self.should_drop_ohlcv = False
self.should_winsorize = False
self.winsorize_by = 'close_shift'
self.winsorize_percent = 5
def create_model(self):
n_clusters = 5
init = 'k-means++'
n_init = 10
tol = 1e-04
# rs = 42 #None
kmeans = KMeans(
n_clusters=n_clusters,
init=init,
n_init=n_init,
tol=tol,
# random_state=rs
)
self.model = kmeans
def fit_model(self, scaled_df):
model = self.model
model.fit(scaled_df)
def model_transform(self, scaled_df):
# self.context.Debug(f"Transforming LIVE Data")
transformed = self.model.transform(scaled_df).min(axis=1)
# self.context.Debug(f"Transformed LIVE Data")
distance = pd.Series(transformed)
# self.context.Debug(f"Made pd.Series of Distances")
return distance
def get_features(self, history, training=False, live=False):
# symbol_hist = history.loc[symbol]
df = history.copy()
df.dropna(axis=0, inplace=True)
# self.context.Debug(df.head())
closes = df['close']
fast_len = 20
slow_len = 50
# df['ema_fast'] = closes.rolling(window=fast_len).mean()
# df['ema_slow'] = closes.rolling(window=slow_len).mean()
ema_fast = closes.rolling(window=fast_len).mean()
ema_slow = closes.rolling(window=slow_len).mean()
# self.context.Debug("Computed MAs")
df['trend_fast'] = closes / ema_fast
df['trend_slow'] = closes / ema_slow
df['close_shift'] = closes - closes.shift(1)
df['volume_shift'] = df['volume'] - df['volume'].shift(1)
df['close_diff'] = closes.diff(1)
df['volume_diff'] = df['volume'].diff(1)
df['HL'] = df['high'] - df['low']
df['HL_vol'] = df['HL'].rolling(window=50).std()
df.dropna(axis=0, inplace=True)
if self.should_drop_ohlcv:
to_drop = ['open', 'high', 'low', 'close', 'volume']
df.drop(to_drop, axis=1, inplace=True)
# self.context.Debug("Ready to scale and/or winsorize")
if training:
if self.should_winsorize:
wins_df = self.winsorize_df(df)
scaled_df = self.scale_df(wins_df, training=True)
else:
scaled_df = self.scale_df(df, training=True)
df = scaled_df
elif live:
scaled_df = self.scale_df(df, live=True)
df = scaled_df
return df
def scale_df(self, df, training=False, live=False):
df = df.copy()
if training:
# self.context.Debug(f"Scaling TRAINING Features, columns:{len(df.columns)}")
scaler = MinMaxScaler()
scaler = scaler.fit(df)
self.scaler = scaler
# self.context.Debug(self.scaler)
scaled = self.scaler.transform(df)
elif live:
# self.context.Debug(f"Scaling LIVE Features, columns:{len(df.columns)}")
scaler = self.scaler
scaled = scaler.transform(df)
return scaled
def winsorize_df(self, df):
# self.context.Debug("Winsorizing")
wins_df = df.copy()
to_wins = wins_df[self.winsorize_by]
drop_percent = self.winsorize_percent / 2
lower = 0 + (drop_percent/100)
upper = 1 - (drop_percent/100)
# self.context.Debug("Calculated upper/lower for wins")
wins = to_wins[ to_wins.between(to_wins.quantile(lower), to_wins.quantile(upper)) ]
# self.context.Debug(f"Got wins, len: {len(wins)}, {wins.index}")
wins_df = df.reindex(wins.index)
# self.context.Debug(f"Got wins_df, len: {len(wins_df)}")
dropped = df[~df.isin(wins_df)].dropna()
# dropped = len(df) - len(wins_df)
# self.context.Debug(f"Finished winsorizing, dropped samples len: {len(dropped)}")
return wins_dffrom CustomIndicators import SuperTrend, Channel
import pandas as pd
class SymbolData:
def __init__(self, context, symbol):
self.context = context
self.symbol = symbol
self.ticker = symbol.Value
self.window_period = 500
self.bars = RollingWindow[TradeBar](self.window_period)
self.hour_bars = RollingWindow[TradeBar](24)
self.channel = Channel(120)
self.supertrend_fifteen = SuperTrend(7, 3)
self.supertrend_hour = SuperTrend(7, 3)
self.ema_fast = ExponentialMovingAverage(20)
self.ema_slow = ExponentialMovingAverage(50)
self.fifteen_consolidator = TradeBarConsolidator(timedelta(minutes=15))
self.fifteen_consolidator.DataConsolidated += self.fifteen_consolidated
context.SubscriptionManager.AddConsolidator(symbol, self.fifteen_consolidator)
context.RegisterIndicator(symbol, self.channel, self.fifteen_consolidator)
context.RegisterIndicator(symbol, self.supertrend_fifteen, self.fifteen_consolidator)
context.RegisterIndicator(symbol, self.ema_fast, self.fifteen_consolidator)
context.RegisterIndicator(symbol, self.ema_slow, self.fifteen_consolidator)
self.threshold = None
self.can_trade = True
self.in_trade = False
self.retrain_model = False
self.timer = False
self.timer_exp = None
self.no_exit = 0
def fifteen_consolidated(self, sender, bar):
self.bars.Add(bar)
### Why not use self.context.Time and create own custom hourly bars and push to own consolidated function and execute relevant logic??
t = bar.EndTime
# self.context.Debug(f"{self.ticker} 15min t: {t}, close: {bar.Close}")
if t.minute == 30:
bars = list(self.bars)[:4]
tb = self.make_tradebar(bars)
self.hour_consolidated(tb)
elif t.hour == 16:
bars = list(self.bars)[:2]
tb = self.make_tradebar(bars)
self.hour_consolidated(tb)
self.context.evaluate(self.symbol, bar)
def make_tradebar(self, bars):
tb = TradeBar()
tb.Time = bars[-1].Time
tb.Open = bars[-1].Open
tb.High = max([bar.High for bar in bars])
tb.Low = min([bar.Low for bar in bars])
tb.Close = bars[0].Close
tb.Volume = sum([bar.Volume for bar in bars])
tb.EndTime = bars[0].EndTime
return tb
def hour_consolidated(self, bar):
self.hour_bars.Add(bar)
self.supertrend_hour.Update(bar)
# self.context.Debug(f"{self.ticker} 60min t: {bar.EndTime}, close: {bar.Close}")
# self.context.Debug(f"{self.ticker} hour supertrend: {self.supertrend_hour.Value}")
# if not self.context.IsWarmingUp:
# self.context.Debug(f"Hour Consolidated, BarTime: {bar.Time}, Open: {bar.Open}, Close: {bar.Close}")
def remove_consolidators(self):
self.context.SubscriptionManager.RemoveConsolidator(self.symbol, self.fifteen_consolidator)
# self.context.SubscriptionManager.RemoveConsolidator(self.symbol, self.hour_consolidator)
self.context.Debug(f"Consolidators for {self.ticker} have been removed")
def warm_up_history(self, history, consolidators=[15, 60]):
hist = history[['open', 'high', 'low', 'close', 'volume']].copy()
for period in consolidators:
resampled = self.resample(hist, period)
for idx, bar in resampled.iterrows():
tb = TradeBar()
tb.Open = bar.open
tb.High = bar.high
tb.Low = bar.low
tb.Close = bar.close
tb.Volume = bar.volume
tb.Time = idx - timedelta(minutes=15)
tb.EndTime = idx
if period == 15:
self.bars.Add(tb)
self.channel.Update(tb)
self.supertrend_fifteen.Update(tb)
self.ema_fast.Update(tb.EndTime, tb.Close)
self.ema_slow.Update(tb.EndTime, tb.Close)
elif period == 60:
self.hour_bars.Add(tb)
self.supertrend_hour.Update(tb)
self.context.Debug(f"Finished warming up data for {self.ticker}, self.bars len: {self.bars.Count}")
def resample(self, history, mins):
logic = {'open' : 'first',
'high' : 'max',
'low' : 'min',
'close' : 'last',
'volume': 'sum'}
resampled = history.resample(f'{mins}min', closed='right', label='right', base=30).apply(logic)
resampled.dropna(axis=0, inplace=True)
# self.context.Debug(f"resampled: {resampled.to_string()}")
return resampled
def IsReady(self):
bars = self.bars.IsReady
indicators = self.supertrend_fifteen.IsReady and self.supertrend_hour.IsReady and self.channel.IsReady
return bars and indicators
def build_df(self, training=False, live=False):
if self.bars.Count != self.window_period:
self.context.Debug(f"self.bars not full so can't build_df, len: {self.bars.Count}")
return None
# self.context.Debug(f"Building DF, live={live}, bars len: {self.bars.Count}")
dts = []
opens = []
highs = []
lows = []
closes = []
volumes = []
for bar in self.bars:
dts.append(bar.EndTime)
opens.append(bar.Open)
highs.append(bar.High)
lows.append(bar.Low)
closes.append(bar.Close)
volumes.append(bar.Volume)
bars_dict = {'open': opens, 'high': highs, 'low': lows, 'close': closes, 'volume': volumes}
df = pd.DataFrame(bars_dict, index=dts)
# self.context.Debug(f"DF len: {len(df)}")
df.sort_index(axis=0, ascending=True, inplace=True)
if training:
df = df.tail(450)
elif live:
df = df.tail(160)
return df
def start_timer(self, td):
self.timer = True
self.timer_exp = self.context.Time + td
self.context.Debug(f"Timer has been STARTED for {self.ticker} with expiration of {self.timer_exp}")
def reset_timer(self):
self.timer = False
self.timer_exp = None
self.context.Debug(f"Timer has been RESET for {self.ticker}")class SuperTrend:
def __init__(self, period, mult):
self.Name = "SuperTrend ({}, {})".format(period, mult)
self.Value = 0
self.IsReady = False
self.bars = RollingWindow[IBaseData](period)
self.atr = AverageTrueRange(period)
self.period = period
self.mult = mult
self.upper_prev = 0
self.lower_prev = 0
self.trend_prev = 0
self.st_prev = 0
self.val_window = RollingWindow[float](5)
def Update(self, bar):
self.bars.Add(bar)
self.atr.Update(bar)
close = bar.Close
prev_close = self.bars[1].Close if (self.bars.Count > 1) else 0
hl2 = (bar.High + bar.Low) / 2
atr = self.atr.Current.Value
factor = self.mult * atr
upper_basic = hl2 + factor
lower_basic = hl2 - factor
upper_final = upper_basic if ((upper_basic < self.upper_prev) or (prev_close > self.upper_prev)) else self.upper_prev
lower_final = lower_basic if ((lower_basic > self.lower_prev) or (prev_close < self.lower_prev)) else self.lower_prev
st = 0
if self.st_prev == self.upper_prev:
st = upper_final if (close <= upper_final) else lower_final
if self.st_prev == self.lower_prev:
st = lower_final if (close >= lower_final) else upper_final
self.upper_prev = upper_final
self.lower_prev = lower_final
self.st_prev = st
self.Value = st
self.val_window.Add(st)
self.IsReady = self.bars.IsReady
class Channel:
def __init__(self, lookback, channel_type='close'):
self.Name = f"Channel ({lookback})"
self.Value = 0
self.IsReady = False
self.channel_type = channel_type
self.data = RollingWindow[IBaseData](lookback)
self.upper_history = RollingWindow[float](2)
self.lower_history = RollingWindow[float](2)
self.mid_history = RollingWindow[float](2)
self.upper = None
self.lower = None
self.mid = None
def Update(self, bar):
self.data.Add(bar)
if not self.data.IsReady:
return
if self.channel_type == 'close':
self.upper = max([bar.Close for bar in self.data])
self.lower = min([bar.Close for bar in self.data])
self.mid = (self.upper + self.lower) / 2
elif self.channel_type == 'high_low':
self.upper = max([bar.High for bar in self.data])
self.lower = min([bar.Low for bar in self.data])
self.mid = (self.upper + self.lower) / 2
self.upper_history.Add(self.upper)
self.lower_history.Add(self.lower)
self.mid_history.Add(self.mid)
if self.IsReady is False and self.upper_history.IsReady:
self.IsReady = True