| Overall Statistics |
|
Total Trades 34 Average Win 1.23% Average Loss -0.46% Compounding Annual Return 19.425% Drawdown 0.700% Expectancy 2.008 Net Profit 16.878% Sharpe Ratio 2.307 Probabilistic Sharpe Ratio 99.708% Loss Rate 18% Win Rate 82% Profit-Loss Ratio 2.65 Alpha 0 Beta 0 Annual Standard Deviation 0.057 Annual Variance 0.003 Information Ratio 2.307 Tracking Error 0.057 Treynor Ratio 0 Total Fees $4763.05 Estimated Strategy Capacity $240000.00 Lowest Capacity Asset UVIX XX86EIYKAK4L Portfolio Turnover 10.52% |
from AlgorithmImports import *
from datetime import date, time, datetime
import pandas as pd
import numpy as np
from sklearn.neighbors import KNeighborsClassifier
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.model_selection import GridSearchCV
import joblib
from scipy.spatial.distance import pdist
class ExampleAlpha(AlphaModel):
def __init__(self, algorithm: QCAlgorithm, lookback: timedelta):
' Initialize a new instance of LorentzianClassification'
self.symbols_data = {}
self.algorithm = algorithm
self.lookback = lookback
def Update(self, algorithm: QCAlgorithm, data: Slice) -> List[Insight]:
' Generate insights as new data is fed through the algorithm.'
insights = []
for sym in self.symbols_data.keys():
obj = self.symbols_data[sym]
invested = self.algorithm.Portfolio[sym].Invested
islong = self.algorithm.Portfolio[sym].IsLong
holding_period = (timedelta(minutes=5) * obj.target_shift) - timedelta(minutes=1)
if obj.IsReady() and self.EndOfTradingDay(self.algorithm.Time) > holding_period:
if obj.prediction and not invested:
insights.append(Insight.Price(sym, holding_period, InsightDirection.Up, float(0.001), None, None, 1.0))
return insights
def OnSecuritiesChanged(self, algorithm: QCAlgorithm, changes: SecurityChanges) -> None:
' Make one-time actions on securities as they enter or exit the universe.'
for security in changes.AddedSecurities:
self.symbols_data[security.Symbol] = SymbolData(algorithm, security.Symbol, self.lookback)
for security in changes.RemovedSecurities:
if security.Symbol in self.symbols_data:
symbol_data = self.symbols_data.pop(security.Symbol, None)
if symbol_data:
symbol_data.Dispose()
def EndOfTradingDay(self, current: datetime) -> timedelta:
' Compute and return timedelta corresponding to the last minute of trading from the moment an insight is created'
close = datetime.combine(self.algorithm.Time.date(), time(15, 58, 00))
return (close - current)
class SymbolData:
def __init__(self, algorithm: QCAlgorithm, symbol: Symbol, lookback: timedelta):
' Initialize a new instance of LorentzianClassification'
self.algorithm = algorithm
self.symbol = symbol
self.lookback = lookback
self.consolidator_period = timedelta(minutes=5)
self.history = self.algorithm.History(self.symbol, self.lookback, self.algorithm.UniverseSettings.Resolution)
self.window_size = timedelta(minutes=self.history.shape[0] * 0.95) // (self.consolidator_period)
self.indicators = []
self.combinations = []
self.selected_features = []
self.rolling_windows = {}
self.features = pd.DataFrame()
self.training_data = pd.DataFrame()
self.models = Models(algorithm=algorithm)
self.last_trained = 0
self.prediction = 0
self.required_confidence = 0.60
self.target_shift = 4
# Create indicators
self.adx = AverageDirectionalIndex(14)
self.adx.Name = 'ADX'
self.adx.Updated += (lambda sender, updated: self.rolling_windows[self.adx.Name].Add(updated))
self.rsi = RelativeStrengthIndex(14, movingAverageType=2)
self.rsi.Name = 'RSI'
self.rsi.Updated += (lambda sender, updated: self.rolling_windows[self.rsi.Name].Add(updated))
self.cci = CommodityChannelIndex(14, movingAverageType=2)
self.cci.Name = 'CCI'
self.cci.Updated += (lambda sender, updated: self.rolling_windows[self.cci.Name].Add(updated))
self.ema = ExponentialMovingAverage(20)
self.ema.Name = 'EMA'
self.ema.Updated += (lambda sender, updated: self.rolling_windows[self.ema.Name].Add(updated))
self.tema = TripleExponentialMovingAverage(40)
self.tema.Name = 'TripleEMA'
self.tema.Updated += (lambda sender, updated: self.rolling_windows[self.tema.Name].Add(updated))
self.crossover = IndicatorExtensions.Over(self.tema, self.ema, 'Crossover')
self.crossover.Updated += (lambda sender, updated: self.rolling_windows[self.crossover.Name].Add(updated))
# Store indicators for easy indexing
self.indicators.append(self.adx)
self.indicators.append(self.rsi)
self.indicators.append(self.cci)
self.indicators.append(self.ema)
self.indicators.append(self.tema)
self.indicators.append(self.crossover)
# Combination indicators are treated differently than regular indicators. They don't inherit from IIndicatorWarmUpPeriodProvider and so cannot benefit from WarmUpIndicator
# And shouldn't be Registered to the consolidator.
self.combinations.append(self.crossover)
# Determine which features will be used in machine learning models
self.selected_features = ['RSI', 'ADX', 'CCI', 'Crossover'] + ['volume']
# Create a consolidator to update the indicators and create a rolling window for consolidator bars.
self.consolidator = TradeBarConsolidator(self.consolidator_period)
self.consolidator.DataConsolidated += self.OnDataConsolidated
self.rolling_windows['Price'] = RollingWindow[TradeBar](self.window_size)
# Register the consolidator to update the indicators.
self.algorithm.SubscriptionManager.AddConsolidator(self.symbol, self.consolidator)
# Create rolling windows, Warm up the indicators, and update rolling windows
for indicator in self.indicators:
self.rolling_windows[indicator.Name] = RollingWindow[IndicatorDataPoint](self.window_size)
if indicator not in self.combinations:
self.algorithm.RegisterIndicator(self.symbol, indicator, self.consolidator)
def OnDataConsolidated(self, sender: object, consolidated_bar: TradeBar) -> None:
' Performs actions when the cosolidator object emits a sender bar'
# Update the consolidated price rolling window
self.rolling_windows['Price'].Add(consolidated_bar)
# Summarize all features into a single DataFrame on each consolidated bar. Use the most recent consolidated bar as prediction inputs.
if self.IsReady():
self.features = self.PrepareInputs(consolidated_bar)
# If the model is finished training predict the direction of the next n-bars.
if self.models.trained_models['KNN'] is not None and not self.models.training:
model = self.models.trained_models['KNN']
self.proba = model.predict_proba(self.features).flatten()
self.prediction = [1 if model.predict(self.features)[0] and self.proba[1] > self.required_confidence else 0][0]
if self.last_trained != self.algorithm.Time.isocalendar()[1]:
self.last_trained = self.algorithm.Time.isocalendar()[1]
# if self.last_trained != self.algorithm.Time.month:
# self.last_trained = self.algorithm.Time.month
training_features, training_targets = self.PrepareTrainingData()
self.models.TrainModels(
training_data = training_features.join(training_targets, how='inner', on='time'),
)
else:
if not self.algorithm.IsWarmingUp:
windows = [ind for ind, win in self.rolling_windows.items() if not win.IsReady]
indicators = [ind.Name for ind in self.indicators if not ind.IsReady]
self.algorithm.Debug(f'Waiting on windows: {windows}, indicators: {indicators} : {self.algorithm.Time}')
def PrepareTrainingData(self) -> pd.DataFrame:
' Summarizes the rolling windows into a single DataFrame. Returns training features and training targets.'
# Reset the training data DataFrame
self.training_data = self.algorithm.PandasConverter.GetDataFrame[TradeBar](list(self.rolling_windows['Price'])).loc[self.symbol]
# Construct the training data DataFrame from all rolling windows
for indicator in self.indicators:
key = indicator.Name
wind = self.rolling_windows[key]
indicator_df = self.algorithm.PandasConverter.GetDataFrame[IndicatorDataPoint](list(wind)).reset_index().drop(columns='symbol').set_index('time').rename(columns = {'value': key})
self.training_data = self.training_data.join(indicator_df, how='inner', on='time')
# Reverse the order of the DataFrame such that the most recent consolidated bar is the last row of the DataFrame
self.training_data = self.training_data[::-1]
# Generate the targets. Set the threshold of a positive bar to a factor of the standard deviation
self.training_data['target'] = np.log(1 + self.training_data.close.pct_change()) - (np.log(1 + self.training_data.close.pct_change()).std())
self.training_data['target'] = np.sign(self.training_data.target).shift(-self.target_shift)
self.training_data = self.training_data.dropna()
training_features = self.training_data.loc[:, self.selected_features].copy()
training_targets = self.training_data.iloc[:, -1].to_frame()
return training_features, training_targets
def PrepareInputs(self, consolidated_bar: TradeBar) -> pd.DataFrame:
' Extracts the most recent consolidated data into a single DataFrame'
# Construct the feature data DataFrame from all rolling windows
self.features = pd.DataFrame(data={x.Name: self.rolling_windows[x.Name][0].Value for x in self.indicators if x.Name in self.selected_features}, index=pd.Series(consolidated_bar.EndTime))
bar = self.rolling_windows['Price'][0]
prices = pd.DataFrame(data={'open': bar.Open, 'high': bar.High, 'low': bar.Low, 'close': bar.Close, 'volume': bar.Volume}, index=pd.Series(bar.EndTime))
self.features = prices.join(self.features)
return self.features.loc[:, self.selected_features]
def IsReady(self) -> bool:
' Returns true if all rolling windows are ready'
windows_ready = all([win.IsReady for ind, win in self.rolling_windows.items()])
indicators_ready = all([x.IsReady for x in self.indicators])
ready = all([windows_ready, indicators_ready])
return ready
# If you have a dynamic universe, remove consolidators for the securities removed from the universe
def Dispose(self) -> None:
self.algorithm.SubscriptionManager.RemoveConsolidator(self.symbol, self.consolidator)
class Models:
' Various models for forward looking insights.'
def __init__(self, algorithm: QCAlgorithm):
' Initializes a new instance of Models'
self.algorithm = algorithm
self.training = False
self.trained_models = {
'KNN': None
}
def Preprocess(self, training_data: pd.DataFrame) -> pd.DataFrame:
' Preprocess the summary DataFrame of all rolling windows.'
# Reset the index and drop the timestamp
# training_data = training_data.reset_index().drop(columns='time')
# Separate the features from the targets
train_x = training_data.iloc[:, :-1].copy()
train_y = training_data.target.to_frame()
return train_x, train_y
def TrainModels(self, training_data: pd.DataFrame):
' Trains all models'
if not self.training:
self.training = True
training_features, training_targets = self.Preprocess(
training_data = training_data,
)
self.TrainSklearnKNN(
training_features = training_features,
training_targets = training_targets,
)
self.TradingViewLorentzianClassification(
training_features = training_features,
training_targets = training_targets,
)
self.ResearchLorentzianClassification(
training_features = training_features,
training_targets = training_targets,
)
if self.training:
self.training = False
def TrainSklearnKNN(self, training_features: pd.DataFrame, training_targets: pd.DataFrame):
' Trains, optimizes and saves an SKlearn K-Nearest Neighbors classifier'
knn = KNeighborsClassifier()
self.training = True
param_grid = [
{
'n_neighbors': list(np.arange(2, 21, 2)),
'leaf_size': list(np.arange(1, 2, 1)),
'metric': ['cityblock']
}]
gcv = GridSearchCV(estimator=knn, param_grid=param_grid, cv=10, verbose=0, n_jobs = -1).fit(training_features, training_targets)
# self.algorithm.Debug(f'{gcv.best_params_}')
pipe = Pipeline([
# ('scaler', MinMaxScaler()),
('scaler', StandardScaler()),
# ('scaler', RobustScaler()),
# ('pca', PCA()),
('nearest_neighbors', KNeighborsClassifier(**gcv.best_params_))
])
model = pipe.fit(training_features, training_targets)
# Store the most recent file in the models dictionary
model_key = 'KNN'
self.trained_models[model_key] = model
self.training = False
# Save the model to ObjectStore
# file_name = self.algorithm.ObjectStore.GetFilePath(model_key)
# joblib.dump(model, file_name)
# self.algorithm.ObjectStore.Save(model_key)
def TradingViewLorentzianClassification(self, training_features: pd.DataFrame, training_targets: pd.DataFrame):
' Replicates the model logic from the Trading View indicator: Lorentzian Classification'
pass
def ResearchLorentzianClassification(self, training_features: pd.DataFrame, training_targets: pd.DataFrame):
''' Modiifies the Trading View Lorentzian Classification by adhering to the findings from the following research paper:
https://www.researchgate.net/publication/282441155_A_new_classification_method_by_using_Lorentzian_distance_metric
'''
pass
def LoadModel(self, model_key: str) -> None:
' Loads saved models if they exist'
if self.algorithm.ObjectStore.ContainsKey(model_key):
file_name = self.algorithm.ObjectStore.GetFilePath(model_key)
return joblib.load(file_name)from AlgorithmImports import *
from datetime import date, time, datetime
class LongVol(AlphaModel):
def __init__(self, algorithm: QCAlgorithm, long_vol_symbols: list):
' Initialize'
self.symbol_data_by_symbol = {}
self.symbols = [ Symbol.Create(symbol, SecurityType.Equity, Market.USA).Value for symbol in long_vol_symbols ]
self.algorithm = algorithm
def Update(self, algorithm: QCAlgorithm, data: Slice) -> List[Insight]:
' Generate insights as new data is fed through the algorithm.'
insights = []
for symbol in self.symbol_data_by_symbol.keys():
if self.symbol_data_by_symbol[symbol].indicator.Current.Value < -5 and not self.algorithm.Portfolio[symbol].Invested:
insights.append(Insight.Price(symbol, timedelta(minutes=5), InsightDirection.Up, float(0.001), None, None, 1.0))
if self.symbol_data_by_symbol[symbol].indicator.Current.Value > 6 and not self.algorithm.Portfolio[symbol].Invested:
insights.append(Insight.Price(symbol, self.EndOfTradingDay(self.algorithm.Time), InsightDirection.Up, float(0.001), None, None, 1.0))
return insights
def OnSecuritiesChanged(self, algorithm: QCAlgorithm, changes: SecurityChanges) -> None:
' Make one-time actions on securities as they enter or exit the universe.'
for security in changes.AddedSecurities:
if security.Symbol in self.symbols:
self.symbol_data_by_symbol[security.Symbol] = SymbolData(algorithm, security.Symbol)
history = algorithm.History[TradeBar](security.Symbol, 5, algorithm.UniverseSettings.Resolution)
self.symbol_data_by_symbol[security.Symbol].warmup_consolidator(history)
for security in changes.RemovedSecurities:
if security.Symbol in self.symbol_data_by_symbol:
symbol_data = self.symbol_data_by_symbol.pop(security.Symbol, None)
if symbol_data:
symbol_data.dispose()
def EndOfTradingDay(self, current: datetime) -> timedelta:
' Compute and return timedelta corresponding to the last minute of trading from the moment an insight is created'
close = datetime.combine(self.algorithm.Time.date(), time(15, 58, 00))
return (close - current)
class SymbolData:
def __init__(self, algorithm, symbol):
self.algorithm = algorithm
self.symbol = symbol
self.indicator = MomentumPercent(2)
self.consolidator = TradeBarConsolidator(1)
algorithm.SubscriptionManager.AddConsolidator(symbol, self.consolidator)
algorithm.RegisterIndicator(symbol, self.indicator, self.consolidator)
algorithm.WarmUpIndicator(self.symbol, self.indicator)
def dispose(self):
self.algorithm.SubscriptionManager.RemoveConsolidator(self.symbol, self.consolidator)
def warmup_consolidator(self, history):
for trade_bar in history:
self.consolidator.Update(trade_bar)
from AlgorithmImports import *
from datetime import date, time, datetime
import pandas as pd
import numpy as np
import joblib
from sklearn.neighbors import KNeighborsClassifier, NearestNeighbors
from sklearn.pipeline import Pipeline
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler
from sklearn.model_selection import GridSearchCV
from scipy.spatial.distance import pdist
from collections import deque
class LorentzianClassification(AlphaModel):
def __init__(self, algorithm: QCAlgorithm, lookback: timedelta):
' Initialize a new instance of LorentzianClassification'
self.symbols_data = {}
self.algorithm = algorithm
self.lookback = lookback
def Update(self, algorithm: QCAlgorithm, data: Slice) -> List[Insight]:
' Generate insights as new data is fed through the algorithm.'
insights = []
# Don't emit insights during warm up
if self.algorithm.IsWarmingUp: return insights
# Iterate through each symbol and generate insights
for sym in self.symbols_data.keys():
obj = self.symbols_data[sym]
invested = self.algorithm.Portfolio[sym].Invested
holding_period = (timedelta(minutes=5) * obj.target_shift) - timedelta(minutes=1)
ready = obj.IsReady() and self.EndOfTradingDay(self.algorithm.Time) > holding_period and not invested
if not ready:
return insights
else:
if obj.long:
insights.append(Insight.Price(sym, holding_period, InsightDirection.Up, float(0.001), None, None, 1.0))
# if obj.short:
# insights.append(Insight.Price(sym, holding_period, InsightDirection.Down, float(0.001), None, None, 1.0))
return insights
def OnSecuritiesChanged(self, algorithm: QCAlgorithm, changes: SecurityChanges) -> None:
' Make one-time actions on securities as they enter or exit the universe.'
for security in changes.AddedSecurities:
self.symbols_data[security.Symbol] = SymbolData(algorithm, security.Symbol, self.lookback)
for security in changes.RemovedSecurities:
if security.Symbol in self.symbols_data:
symbol_data = self.symbols_data.pop(security.Symbol, None)
if symbol_data:
symbol_data.Dispose()
def EndOfTradingDay(self, current: datetime) -> timedelta:
' Compute and return timedelta corresponding to the last minute of trading from the moment an insight is created'
close = datetime.combine(self.algorithm.Time.date(), time(15, 58, 00))
return (close - current)
class SymbolData:
def __init__(self, algorithm: QCAlgorithm, symbol: Symbol, lookback: timedelta):
' Initialize a new instance of LorentzianClassification'
self.algorithm = algorithm
self.symbol = symbol
self.lookback = lookback
self.consolidator_period = timedelta(minutes=5)
self.history = self.algorithm.History(self.symbol, self.lookback, self.algorithm.UniverseSettings.Resolution)
self.window_size = 666
self.indicators = []
self.combinations = []
self.selected_features = []
self.rolling_windows = {}
self.features = pd.DataFrame()
self.training_data = pd.DataFrame()
self.models = Models(algorithm=algorithm)
self.last_trained = 0
self.required_confidence = 0.65
self.target_shift = 4
self.long = False
self.short = False
# Create indicators
self.adx = AverageDirectionalIndex(14)
self.adx.Name = 'ADX'
self.adx.Updated += (lambda sender, updated: self.rolling_windows[self.adx.Name].Add(updated))
self.rsi = RelativeStrengthIndex(14, movingAverageType=2)
self.rsi.Name = 'RSI'
self.rsi.Updated += (lambda sender, updated: self.rolling_windows[self.rsi.Name].Add(updated))
self.cci = CommodityChannelIndex(14, movingAverageType=2)
self.cci.Name = 'CCI'
self.cci.Updated += (lambda sender, updated: self.rolling_windows[self.cci.Name].Add(updated))
self.sema = ExponentialMovingAverage(15)
self.sema.Name = 'Short_EMA'
self.sema.Updated += (lambda sender, updated: self.rolling_windows[self.sema.Name].Add(updated))
self.lema = ExponentialMovingAverage(29)
self.lema.Name = 'Long_EMA'
self.lema.Updated += (lambda sender, updated: self.rolling_windows[self.lema.Name].Add(updated))
self.crossover = IndicatorExtensions.Over(self.sema, self.lema, 'Crossover')
self.crossover.Updated += (lambda sender, updated: self.rolling_windows[self.crossover.Name].Add(updated))
# Store indicators for easy indexing
self.indicators.append(self.adx)
self.indicators.append(self.rsi)
self.indicators.append(self.cci)
self.indicators.append(self.sema)
self.indicators.append(self.lema)
self.indicators.append(self.crossover)
# Combination indicators are treated differently than regular indicators. They don't inherit from IIndicatorWarmUpPeriodProvider and so cannot benefit from WarmUpIndicator
# And shouldn't be Registered to the consolidator.
self.combinations.append(self.crossover)
# Determine which features will be used in machine learning models
self.selected_features = ['RSI', 'ADX', 'CCI', 'Crossover'] + ['volume']
# Create a consolidator to update the indicators and create a rolling window for consolidator bars.
self.consolidator = TradeBarConsolidator(self.consolidator_period)
self.consolidator.DataConsolidated += self.OnDataConsolidated
self.rolling_windows['Price'] = RollingWindow[TradeBar](self.window_size)
# Register the consolidator to update the indicators.
self.algorithm.SubscriptionManager.AddConsolidator(self.symbol, self.consolidator)
# Create rolling windows, Warm up the indicators, and update rolling windows
for indicator in self.indicators:
self.rolling_windows[indicator.Name] = RollingWindow[IndicatorDataPoint](self.window_size)
if indicator not in self.combinations:
self.algorithm.RegisterIndicator(self.symbol, indicator, self.consolidator)
def OnDataConsolidated(self, sender: object, consolidated_bar: TradeBar) -> None:
' Performs actions when the cosolidator object emits a sender bar'
# Update the consolidated price rolling window
self.rolling_windows['Price'].Add(consolidated_bar)
# Summarize all features into a single DataFrame on each consolidated bar. Use the most recent consolidated bar as prediction inputs.
if self.IsReady():
self.features = self.PrepareInputs(consolidated_bar)
# If the model is finished training predict the direction of the next n-bars.
if self.models.trained_models['KNN'] is not None and not self.models.training:
model = self.models.trained_models['KNN']
prediction = model.predict(self.features)[0]
self.proba = model.predict_proba(self.features).flatten()
long_signal = self.proba[1] > self.required_confidence
short_signal = self.proba[0] >= self.required_confidence
# Determine position.
if prediction and long_signal:
self.algorithm.Debug(f'{self.last_trained}')
self.long = True
self.short = False
elif not prediction and short_signal:
self.long = False
self.short = True
else:
self.long = False
self.short = False
if self.last_trained != self.algorithm.Time.isocalendar()[1]:
self.last_trained = self.algorithm.Time.isocalendar()[1]
# if self.last_trained != self.algorithm.Time.month:
# self.last_trained = self.algorithm.Time.month
# if self.last_trained != self.algorithm.Time.day:
# self.last_trained = self.algorithm.Time.day
training_features, training_targets = self.PrepareTrainingData()
self.models.TrainModels(
training_data = training_features.join(training_targets, how='inner', on='time'),
)
else:
if not self.algorithm.IsWarmingUp:
windows = [ind for ind, win in self.rolling_windows.items() if not win.IsReady]
indicators = [ind.Name for ind in self.indicators if not ind.IsReady]
self.algorithm.Debug(f'Waiting on windows: {windows}, indicators: {indicators} : {self.algorithm.Time}')
def PrepareTrainingData(self) -> pd.DataFrame:
' Summarizes the rolling windows into a single DataFrame. Returns training features and training targets.'
# Reset the training data DataFrame
self.training_data = self.algorithm.PandasConverter.GetDataFrame[TradeBar](list(self.rolling_windows['Price'])).loc[self.symbol]
# Construct the training data DataFrame from all rolling windows
for indicator in self.indicators:
key = indicator.Name
wind = self.rolling_windows[key]
indicator_df = self.algorithm.PandasConverter.GetDataFrame[IndicatorDataPoint](list(wind)).reset_index().drop(columns='symbol').set_index('time').rename(columns = {'value': key})
self.training_data = self.training_data.join(indicator_df, how='inner', on='time')
# Reverse the order of the DataFrame such that the most recent consolidated bar is the last row of the DataFrame
self.training_data = self.training_data[::-1]
# Generate the targets. Set the threshold of a positive bar to a factor of the standard deviation
self.training_data['target'] = np.log(1 + self.training_data.close.pct_change()) - (np.log(1 + self.training_data.close.pct_change()).std())
self.training_data['target'] = (np.sign(self.training_data.target).shift(-self.target_shift) + 1) / 2
self.training_data = self.training_data.dropna()
training_features = self.training_data.loc[:, self.selected_features].copy()
training_targets = self.training_data.iloc[:, -1].to_frame()
return training_features, training_targets
def PrepareInputs(self, consolidated_bar: TradeBar) -> pd.DataFrame:
' Extracts the most recent consolidated data into a single DataFrame'
# Construct the feature data DataFrame from all rolling windows
self.features = pd.DataFrame(data={x.Name: self.rolling_windows[x.Name][0].Value for x in self.indicators if x.Name in self.selected_features}, index=pd.Series(consolidated_bar.EndTime))
bar = self.rolling_windows['Price'][0]
prices = pd.DataFrame(data={'open': bar.Open, 'high': bar.High, 'low': bar.Low, 'close': bar.Close, 'volume': bar.Volume}, index=pd.Series(bar.EndTime))
self.features = prices.join(self.features)
return self.features.loc[:, self.selected_features]
def IsReady(self) -> bool:
' Returns true if all rolling windows are ready'
windows_ready = all([win.IsReady for ind, win in self.rolling_windows.items()])
indicators_ready = all([x.IsReady for x in self.indicators])
ready = all([windows_ready, indicators_ready])
return ready
# If you have a dynamic universe, remove consolidators for the securities removed from the universe
def Dispose(self) -> None:
self.algorithm.SubscriptionManager.RemoveConsolidator(self.symbol, self.consolidator)
class Models:
' Various models for forward looking insights.'
def __init__(self, algorithm: QCAlgorithm):
' Initializes a new instance of Models'
self.algorithm = algorithm
self.training = False
self.trained_models = {
'KNN': None
}
def Preprocess(self, training_data: pd.DataFrame) -> pd.DataFrame:
' Preprocess the summary DataFrame of all rolling windows.'
# Reset the index and drop the timestamp
# training_data = training_data.reset_index().drop(columns='time')
# Separate the features from the targets
train_x = training_data.iloc[:, :-1].copy()
train_y = training_data.target.to_frame()
return train_x, train_y
def TrainModels(self, training_data: pd.DataFrame):
' Trains all models'
if not self.training:
self.training = True
training_features, training_targets = self.Preprocess(
training_data = training_data,
)
self.TrainSklearnKNN(
training_features = training_features,
training_targets = training_targets,
)
self.TradingViewLorentzianClassification(
training_features = training_features,
training_targets = training_targets,
)
self.ResearchLorentzianClassification(
training_features = training_features,
training_targets = training_targets,
)
if self.training:
self.training = False
def TrainSklearnKNN(self, training_features: pd.DataFrame, training_targets: pd.DataFrame):
' Trains, optimizes and saves an SKlearn K-Nearest Neighbors classifier'
knn = KNeighborsClassifier()
self.training = True
param_grid = [
{
'n_neighbors': list(np.arange(2, 21, 2)),
'leaf_size': list(np.arange(1, 2, 1)),
'metric': ['cityblock']
}]
gcv = GridSearchCV(estimator=knn, param_grid=param_grid, cv=10, verbose=0, n_jobs = -1).fit(training_features, training_targets)
# self.algorithm.Debug(f'{gcv.best_params_}')
pipe = Pipeline([
# ('scaler', MinMaxScaler()),
('scaler', StandardScaler()),
# ('scaler', RobustScaler()),
# ('pca', PCA()),
('nearest_neighbors', KNeighborsClassifier(**gcv.best_params_))
])
model = pipe.fit(training_features, training_targets)
# Store the most recent file in the models dictionary
model_key = 'KNN'
self.trained_models[model_key] = model
self.training = False
# Save the model to ObjectStore
# file_name = self.algorithm.ObjectStore.GetFilePath(model_key)
# joblib.dump(model, file_name)
# self.algorithm.ObjectStore.Save(model_key)
def TradingViewLorentzianClassification(self, training_features: pd.DataFrame, training_targets: pd.DataFrame):
' Replicates the model logic from the Trading View indicator: Lorentzian Classification'
pass
# neighbors = 8
# distances = deque(maxlen=neighbors)
# predictions = deque(maxlen=neighbors)
# lastDistance = -1.0
# for i = 0 to sizeLoop //{
# d = get_lorentzian_distance(i, settings.featureCount, featureSeries, featureArrays)
# if d >= lastDistance and i%4 //{
# lastDistance := d
# array.push(distances, d)
# array.push(predictions, math.round(array.get(y_train_array, i)))
# if array.size(predictions) > neighbors //{
# lastDistance := array.get(distances, math.round(neighbors * (3/4)))
# array.shift(distances)
# array.shift(predictions)
# //}
# //}
# //}
# prediction := array.sum(predictions)
def ResearchLorentzianClassification(self, training_features: pd.DataFrame, training_targets: pd.DataFrame):
''' Modiifies the Trading View Lorentzian Classification by adhering to the findings from the following research paper:
https://www.researchgate.net/publication/282441155_A_new_classification_method_by_using_Lorentzian_distance_metric
'''
pass
def LoadModel(self, model_key: str) -> None:
' Loads saved models if they exist'
if self.algorithm.ObjectStore.ContainsKey(model_key):
file_name = self.algorithm.ObjectStore.GetFilePath(model_key)
return joblib.load(file_name)
from AlgorithmImports import *
from datetime import date, time, datetime
class ShortVol(AlphaModel):
def __init__(self, algorithm: QCAlgorithm, short_vol_symbols: list):
' Initialize'
self.symbol_data_by_symbol = {}
self.symbols = [ Symbol.Create(symbol, SecurityType.Equity, Market.USA).Value for symbol in short_vol_symbols ]
self.algorithm = algorithm
def Update(self, algorithm: QCAlgorithm, data: Slice) -> List[Insight]:
' Generate insights as new data is fed through the algorithm.'
insights = []
# for symbol in self.symbol_data_by_symbol.keys():
# if self.symbol_data_by_symbol[symbol].indicator.Current.Value < -5 and not self.algorithm.Portfolio[symbol].Invested:
# insights.append(Insight.Price(symbol, timedelta(minutes=5), InsightDirection.Up, float(0.001), None, None, 1.0))
return insights
def OnSecuritiesChanged(self, algorithm: QCAlgorithm, changes: SecurityChanges) -> None:
' Make one-time actions on securities as they enter or exit the universe.'
for security in changes.AddedSecurities:
if security.Symbol in self.symbols:
self.symbol_data_by_symbol[security.Symbol] = SymbolData(algorithm, security.Symbol)
history = algorithm.History[TradeBar](security.Symbol, 5, algorithm.UniverseSettings.Resolution)
self.symbol_data_by_symbol[security.Symbol].warmup_consolidator(history)
for security in changes.RemovedSecurities:
if security.Symbol in self.symbol_data_by_symbol:
symbol_data = self.symbol_data_by_symbol.pop(security.Symbol, None)
if symbol_data:
symbol_data.dispose()
def EndOfTradingDay(self, current: datetime) -> timedelta:
' Compute and return timedelta corresponding to the last minute of trading from the moment an insight is created'
close = datetime.combine(self.algorithm.Time.date(), time(15, 58, 00))
return (close - current)
class SymbolData:
def __init__(self, algorithm, symbol):
self.algorithm = algorithm
self.symbol = symbol
self.indicator = MomentumPercent(2)
self.consolidator = TradeBarConsolidator(1)
algorithm.SubscriptionManager.AddConsolidator(symbol, self.consolidator)
algorithm.RegisterIndicator(symbol, self.indicator, self.consolidator)
algorithm.WarmUpIndicator(self.symbol, self.indicator)
def dispose(self):
self.algorithm.SubscriptionManager.RemoveConsolidator(self.symbol, self.consolidator)
def warmup_consolidator(self, history):
for trade_bar in history:
self.consolidator.Update(trade_bar)
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from AlgorithmImports import *
class SpreadExecutionModel(ExecutionModel):
'''Execution model that submits orders while the current spread is tight.
Note this execution model will not work using Resolution.Daily since Exchange.ExchangeOpen will be false, suggested resolution is Minute
'''
def __init__(self, acceptingSpreadPercent=0.005):
'''Initializes a new instance of the SpreadExecutionModel class'''
self.targetsCollection = PortfolioTargetCollection()
# Gets or sets the maximum spread compare to current price in percentage.
self.acceptingSpreadPercent = Math.Abs(acceptingSpreadPercent)
def Execute(self, algorithm, targets):
'''Executes market orders if the spread percentage to price is in desirable range.
Args:
algorithm: The algorithm instance
targets: The portfolio targets'''
# update the complete set of portfolio targets with the new targets
self.targetsCollection.AddRange(targets)
# for performance we check count value, OrderByMarginImpact and ClearFulfilled are expensive to call
if not self.targetsCollection.IsEmpty:
for target in self.targetsCollection.OrderByMarginImpact(algorithm):
symbol = target.Symbol
# calculate remaining quantity to be ordered
unorderedQuantity = OrderSizing.GetUnorderedQuantity(algorithm, target)
# check order entry conditions
if unorderedQuantity != 0:
# get security information
security = algorithm.Securities[symbol]
if self.SpreadIsFavorable(security):
algorithm.MarketOrder(symbol, unorderedQuantity)
self.targetsCollection.ClearFulfilled(algorithm)
def SpreadIsFavorable(self, security):
'''Determines if the spread is in desirable range.'''
# Price has to be larger than zero to avoid zero division error, or negative price causing the spread percentage < 0 by error
# Has to be in opening hours of exchange to avoid extreme spread in OTC period
return security.Exchange.ExchangeOpen \
and security.Price > 0 and security.AskPrice > 0 and security.BidPrice > 0 \
and (security.AskPrice - security.BidPrice) / security.Price <= self.acceptingSpreadPercent
from AlgorithmImports import *
class CapacityLimitedPortfolioConstructionModel(PortfolioConstructionModel):
'''Respects a threshold percentage of asset volume to avoid market distortion of less liquid assets.'''
def __init__(self, rebalance = Resolution.Daily, portfolioBias = PortfolioBias.LongShort):
'''Initialize a new instance of CapacityLimitedPortfolioConstructionModel
Args:
rebalance: Rebalancing parameter. If it is a timedelta, date rules or Resolution, it will be converted into a function.
If None will be ignored.
The function returns the next expected rebalance time for a given algorithm UTC DateTime.
The function returns null if unknown, in which case the function will be called again in the
next loop. Returning current time will trigger rebalance.
portfolioBias: Specifies the bias of the portfolio (Short, Long/Short, Long)'''
super().__init__()
self.portfolioBias = portfolioBias
# If the argument is an instance of Resolution or Timedelta
# Redefine rebalancingFunc
rebalancingFunc = rebalance
if isinstance(rebalance, int):
rebalance = Extensions.ToTimeSpan(rebalance)
if isinstance(rebalance, timedelta):
rebalancingFunc = lambda dt: dt + rebalance
if rebalancingFunc:
self.SetRebalancingFunc(rebalancingFunc)
def DetermineTargetPercent(self, activeInsights):
'''Will determine the target percent for each insight
Args:
activeInsights: The active insights to generate a target for'''
result = {}
# give equal weighting to each security
count = sum(x.Direction != InsightDirection.Flat and self.RespectPortfolioBias(x) for x in activeInsights)
percent = 0 if count == 0 else 1.0 / count
for insight in activeInsights:
result[insight] = (insight.Direction if self.RespectPortfolioBias(insight) else InsightDirection.Flat) * percent
return result
def RespectPortfolioBias(self, insight):
'''Method that will determine if a given insight respects the portfolio bias
Args:
insight: The insight to create a target for
'''
return self.portfolioBias == PortfolioBias.LongShort or insight.Direction == self.portfolioBias
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from AlgorithmImports import *
class EqualWeightingPortfolioConstructionModel(PortfolioConstructionModel):
'''Provides an implementation of IPortfolioConstructionModel that gives equal weighting to all securities.
The target percent holdings of each security is 1/N where N is the number of securities.
For insights of direction InsightDirection.Up, long targets are returned and
for insights of direction InsightDirection.Down, short targets are returned.'''
def __init__(self, rebalance = Resolution.Daily, portfolioBias = PortfolioBias.LongShort):
'''Initialize a new instance of EqualWeightingPortfolioConstructionModel
Args:
rebalance: Rebalancing parameter. If it is a timedelta, date rules or Resolution, it will be converted into a function.
If None will be ignored.
The function returns the next expected rebalance time for a given algorithm UTC DateTime.
The function returns null if unknown, in which case the function will be called again in the
next loop. Returning current time will trigger rebalance.
portfolioBias: Specifies the bias of the portfolio (Short, Long/Short, Long)'''
super().__init__()
self.portfolioBias = portfolioBias
# If the argument is an instance of Resolution or Timedelta
# Redefine rebalancingFunc
rebalancingFunc = rebalance
if isinstance(rebalance, int):
rebalance = Extensions.ToTimeSpan(rebalance)
if isinstance(rebalance, timedelta):
rebalancingFunc = lambda dt: dt + rebalance
if rebalancingFunc:
self.SetRebalancingFunc(rebalancingFunc)
def DetermineTargetPercent(self, activeInsights):
'''Will determine the target percent for each insight
Args:
activeInsights: The active insights to generate a target for'''
result = {}
# give equal weighting to each security
count = sum(x.Direction != InsightDirection.Flat and self.RespectPortfolioBias(x) for x in activeInsights)
percent = 0 if count == 0 else 1.0 / count
for insight in activeInsights:
result[insight] = (insight.Direction if self.RespectPortfolioBias(insight) else InsightDirection.Flat) * percent
return result
def RespectPortfolioBias(self, insight):
'''Method that will determine if a given insight respects the portfolio bias
Args:
insight: The insight to create a target for
'''
return self.portfolioBias == PortfolioBias.LongShort or insight.Direction == self.portfolioBias
from AlgorithmImports import *
class TrailingStopRiskManagementModel(RiskManagementModel):
'''Provides an implementation of IRiskManagementModel that limits the maximum possible loss
measured from the highest unrealized profit'''
def __init__(self, maximumDrawdownPercent = 0.05):
'''Initializes a new instance of the TrailingStopRiskManagementModel class
Args:
maximumDrawdownPercent: The maximum percentage drawdown allowed for algorithm portfolio compared with the highest unrealized profit, defaults to 5% drawdown'''
self.maximumDrawdownPercent = abs(maximumDrawdownPercent)
self.trailingAbsoluteHoldingsState = dict()
def ManageRisk(self, algorithm, targets):
'''Manages the algorithm's risk at each time step
Args:
algorithm: The algorithm instance
targets: The current portfolio targets to be assessed for risk'''
riskAdjustedTargets = list()
for kvp in algorithm.Securities:
symbol = kvp.Key
security = kvp.Value
# Remove if not invested
if not security.Invested:
self.trailingAbsoluteHoldingsState.pop(symbol, None)
continue
position = PositionSide.Long if security.Holdings.IsLong else PositionSide.Short
absoluteHoldingsValue = security.Holdings.AbsoluteHoldingsValue
trailingAbsoluteHoldingsState = self.trailingAbsoluteHoldingsState.get(symbol)
# Add newly invested security (if doesn't exist) or reset holdings state (if position changed)
if trailingAbsoluteHoldingsState == None or position != trailingAbsoluteHoldingsState.position:
self.trailingAbsoluteHoldingsState[symbol] = trailingAbsoluteHoldingsState = self.HoldingsState(position, security.Holdings.AbsoluteHoldingsCost)
trailingAbsoluteHoldingsValue = trailingAbsoluteHoldingsState.absoluteHoldingsValue
# Check for new max (for long position) or min (for short position) absolute holdings value
if ((position == PositionSide.Long and trailingAbsoluteHoldingsValue < absoluteHoldingsValue) or
(position == PositionSide.Short and trailingAbsoluteHoldingsValue > absoluteHoldingsValue)):
self.trailingAbsoluteHoldingsState[symbol].absoluteHoldingsValue = absoluteHoldingsValue
continue
drawdown = abs((trailingAbsoluteHoldingsValue - absoluteHoldingsValue) / trailingAbsoluteHoldingsValue)
if self.maximumDrawdownPercent < drawdown:
self.trailingAbsoluteHoldingsState.pop(symbol, None)
# liquidate
riskAdjustedTargets.append(PortfolioTarget(symbol, 0))
return riskAdjustedTargets
class HoldingsState:
def __init__(self, position, absoluteHoldingsValue):
self.position = position
self.absoluteHoldingsValue = absoluteHoldingsValuefrom AlgorithmImports import *
from clr import GetClrType as typeof
from Selection.UniverseSelectionModel import UniverseSelectionModel
from itertools import groupby
class ManualUniverseSelectionModel(UniverseSelectionModel):
'''Provides an implementation of IUniverseSelectionModel that simply subscribes to the specified set of symbols'''
def __init__(self, symbols = list(), universeSettings = None):
self.MarketHours = MarketHoursDatabase.FromDataFolder()
self.symbols = [ Symbol.Create(symbol, SecurityType.Equity, Market.USA) for symbol in symbols ]
self.universeSettings = universeSettings
for symbol in self.symbols:
SymbolCache.Set(symbol.Value, symbol)
def CreateUniverses(self, algorithm):
'''Creates the universes for this algorithm. Called once after IAlgorithm.Initialize
Args:
algorithm: The algorithm instance to create universes for</param>
Returns:
The universes to be used by the algorithm'''
universeSettings = self.universeSettings \
if self.universeSettings is not None else algorithm.UniverseSettings
resolution = universeSettings.Resolution
type = typeof(Tick) if resolution == Resolution.Tick else typeof(TradeBar)
universes = list()
# universe per security type/market
self.symbols = sorted(self.symbols, key=lambda s: (s.ID.Market, s.SecurityType))
for key, grp in groupby(self.symbols, lambda s: (s.ID.Market, s.SecurityType)):
market = key[0]
securityType = key[1]
securityTypeString = Extensions.GetEnumString(securityType, SecurityType)
universeSymbol = Symbol.Create(f"manual-universe-selection-model-{securityTypeString}-{market}", securityType, market)
if securityType == SecurityType.Base:
# add an entry for this custom universe symbol -- we don't really know the time zone for sure,
# but we set it to TimeZones.NewYork in AddData, also, since this is a manual universe, the time
# zone doesn't actually matter since this universe specifically doesn't do anything with data.
symbolString = MarketHoursDatabase.GetDatabaseSymbolKey(universeSymbol)
alwaysOpen = SecurityExchangeHours.AlwaysOpen(TimeZones.NewYork)
entry = self.MarketHours.SetEntry(market, symbolString, securityType, alwaysOpen, TimeZones.NewYork)
else:
entry = self.MarketHours.GetEntry(market, None, securityType)
config = SubscriptionDataConfig(type, universeSymbol, resolution, entry.DataTimeZone, entry.ExchangeHours.TimeZone, False, False, True)
universes.append( ManualUniverse(config, universeSettings, list(grp)))
return universes
from AlgorithmImports import *
from Selection.FundamentalUniverseSelectionModel import FundamentalUniverseSelectionModel
from itertools import groupby
from math import ceil
class QC500UniverseSelectionModel(FundamentalUniverseSelectionModel):
'''Defines the QC500 universe as a universe selection model for framework algorithm
For details: https://github.com/QuantConnect/Lean/pull/1663'''
def __init__(self, filterFineData = True, universeSettings = None):
'''Initializes a new default instance of the QC500UniverseSelectionModel'''
super().__init__(filterFineData, universeSettings)
self.numberOfSymbolsCoarse = 750
self.numberOfSymbolsFine = 500
self.dollarVolumeBySymbol = {}
self.lastMonth = -1
def SelectCoarse(self, algorithm, coarse):
'''Performs coarse selection for the QC500 constituents.
The stocks must have fundamental data
The stock must have positive previous-day close price
The stock must have positive volume on the previous trading day'''
if algorithm.Time.month == self.lastMonth:
return Universe.Unchanged
sortedByDollarVolume = sorted([x for x in coarse if x.HasFundamentalData and x.Volume > 0 and x.Price > 0],
key = lambda x: x.DollarVolume, reverse=True)[:self.numberOfSymbolsCoarse]
self.dollarVolumeBySymbol = {x.Symbol:x.DollarVolume for x in sortedByDollarVolume}
# If no security has met the QC500 criteria, the universe is unchanged.
# A new selection will be attempted on the next trading day as self.lastMonth is not updated
if len(self.dollarVolumeBySymbol) == 0:
return Universe.Unchanged
# return the symbol objects our sorted collection
return list(self.dollarVolumeBySymbol.keys())
def SelectFine(self, algorithm, fine):
'''Performs fine selection for the QC500 constituents
The company's headquarter must in the U.S.
The stock must be traded on either the NYSE or NASDAQ
At least half a year since its initial public offering
The stock's market cap must be greater than 500 million'''
sortedBySector = sorted([x for x in fine if x.CompanyReference.CountryId == "USA"
and x.CompanyReference.PrimaryExchangeID in ["NYS","NAS"]
and (algorithm.Time - x.SecurityReference.IPODate).days > 180
and x.MarketCap > 5e8],
key = lambda x: x.CompanyReference.IndustryTemplateCode)
count = len(sortedBySector)
# If no security has met the QC500 criteria, the universe is unchanged.
# A new selection will be attempted on the next trading day as self.lastMonth is not updated
if count == 0:
return Universe.Unchanged
# Update self.lastMonth after all QC500 criteria checks passed
self.lastMonth = algorithm.Time.month
percent = self.numberOfSymbolsFine / count
sortedByDollarVolume = []
# select stocks with top dollar volume in every single sector
for code, g in groupby(sortedBySector, lambda x: x.CompanyReference.IndustryTemplateCode):
y = sorted(g, key = lambda x: self.dollarVolumeBySymbol[x.Symbol], reverse = True)
c = ceil(len(y) * percent)
sortedByDollarVolume.extend(y[:c])
sortedByDollarVolume = sorted(sortedByDollarVolume, key = lambda x: self.dollarVolumeBySymbol[x.Symbol], reverse=True)
return [x.Symbol for x in sortedByDollarVolume[:self.numberOfSymbolsFine]]
# Generic Imports
from AlgorithmImports import *
# Import Alphas
from project.Alphas.LongVol import LongVol
from project.Alphas.ShortVol import ShortVol
from project.Alphas.LorentzianKNN import LorentzianClassification
# Import Universes
from project.Universes.QC500 import QC500UniverseSelectionModel
from project.Universes.ManualSelection import ManualUniverseSelectionModel
# Import Portfolio Constructors
from project.Portfolio_Constructions.EqualWeightPortfolioConstruction import EqualWeightingPortfolioConstructionModel
# Import Execution Models
from project.Executions.Execution import SpreadExecutionModel
# Import Risk Management
from project.Risk_Managements.RiskManagement import TrailingStopRiskManagementModel
# Main.
class VolatilityTrader(QCAlgorithm):
def Initialize(self):
self.lookback = timedelta(days=50)
self.UniverseSettings.Resolution = Resolution.Minute
self.SetStartDate(2022, 5, 1) # Set Start Date
self.SetCash(1000000) # Set Strategy Cash
# Framework
self.SetBrokerageModel(InteractiveBrokersBrokerageModel())
self.SetUniverseSelection(ManualUniverseSelectionModel(['UVIX']))
self.AddAlpha(LorentzianClassification(algorithm=self, lookback=self.lookback))
# self.SetUniverseSelection(ManualUniverseSelectionModel(['UVIX', 'SVXY']))
# self.AddAlpha(LongVol(algorithm=self, long_vol_symbols=['UVIX']))
# self.AddAlpha(ShortVol(algorithm=self, short_vol_symbols=['SVXY']))
self.SetPortfolioConstruction(EqualWeightingPortfolioConstructionModel())
# self.SetExecution(SpreadExecutionModel())
# self.SetRiskManagement(TrailingStopRiskManagementModel(0.03))
self.SetWarmup(self.lookback, self.UniverseSettings.Resolution)
def OnData(self, data: Slice):
if self.IsWarmingUp: return