| Overall Statistics |
|
Total Orders 189 Average Win 3.68% Average Loss -2.66% Compounding Annual Return 103.380% Drawdown 15.800% Expectancy 0.267 Start Equity 100000 End Equity 192226.4 Net Profit 92.226% Sharpe Ratio 2.031 Sortino Ratio 2.548 Probabilistic Sharpe Ratio 79.824% Loss Rate 47% Win Rate 53% Profit-Loss Ratio 1.38 Alpha 0.565 Beta 0.661 Annual Standard Deviation 0.329 Annual Variance 0.108 Information Ratio 1.584 Tracking Error 0.323 Treynor Ratio 1.009 Total Fees $2158.78 Estimated Strategy Capacity $35000000.00 Lowest Capacity Asset VX YNNC8UBM7FMX Portfolio Turnover 34.34% |
# region imports
from AlgorithmImports import *
# endregion
func_lower_pct = lambda x: (x['spot_price'] - x['lower_value']) / x['spot_price'] * 100
#func_IV_rank = lambda x: x['implied_vol_rank'] * 100
func_IV_percentile = lambda x: x['implied_vol_percentile'] * 100
func_strdd_sma = lambda x: x['straddle_pct_sma10'] / x['straddle_pct_sma20'] - 1
func_PVol = lambda x: x['PVol']
DR_tickers_selected = ['AAPL', "SLV", 'QQQ', 'SPY', 'AMZN', 'NVDA', 'MSFT', 'TSLA', 'TLT', 'GOOGL', 'GLD', 'AVGO', 'META']
def get_DR_items(tickers):
output = {}
for ticker in tickers:
output[f'DR_{ticker}_lower_pct'] = {'ticker': ticker, 'col_date': 'Date', 'func': func_lower_pct}
output[f'DR_{ticker}_IV_percentile'] = {'ticker': ticker, 'col_date': 'Date', 'func': func_IV_percentile}
#output[f'DR_{ticker}_strdd_sma_change'] = {'ticker': ticker, 'col_date': 'Date', 'func': func_strdd_sma}
#output[f'DR_{ticker}_PVol'] = {'ticker': ticker, 'col_date': "Date", 'func': func_PVol}
return output
general_setting = {
###### Control Buttons (top) #######
'vix_up_quantile': 0.3,
'vix_down_quantile': 0.7,
'n_champions': 2,
'min_prediction_confidence': 0.4,
'min_model_score': 0.3,
'feature_importance_threshold': 0.9,
'training_lookback': 200,
'train_test_split_ratio': 0.75,
"TP_level": 0.1,
"SL_level": -0.08,
##### Control Buttons (bottom) ######
'data_': {
# index
"VIX":
{"type": "index", "source": "QC"},
"SPX":
{"type": "index", "source": "QC"},
"NDX":
{"type": "index", "source": "QC"},
# equity
"UUP":
{"type": "equity", "source": "QC"},
# "IBIT":
# {"type": "equity", "source": "QC"},
# future
"VX":
{"type": "future", "source": Futures.Indices.VIX},
"CL":
{"type": "future", "source": Futures.Energies.CrudeOilWTI},
"GC":
{"type": "future", "source": Futures.Metals.GOLD},
"BTC":
{'type': "future", "source": Futures.Currencies.MICRO_BTC},
"ZN":
{"type": "future", "source": Futures.Financials.Y_10_TREASURY_NOTE},
# "DX":
# {"type": "future", "source": Futures.Currencies.USD},
# other data
"VVIX":
{"type": "data", "source": CBOE},
"USTYCR":
{"type": "data", "source": USTreasuryYieldCurveRate},
# "10_year_yield":
# {"type": "external",
# "source": "https://raw.githubusercontent.com/deerfieldgreen/us-department-treasury/refs/heads/main/data/daily-treasury-rates.csv",
# "col_date": "Date",
# "col_val": "10 Yr"}
},
'DR_data': {} #get_DR_items(DR_tickers)
# {
# #"DR_spy_up_pct": {"ticker": 'SPY', 'col_date': 'Date', 'func': func_upper_pct},
# 'DR_spy_lower_pct': {'ticker': 'SPY', 'col_date': 'Date', 'func': func_lower_pct},
# 'DR_spy_IV_percentile': {'ticker': 'SPY', 'col_date': 'Date', 'func': func_IV_percentile},
# 'DR_spy_strdd_sma_change': {'ticker': "SPY", 'col_date': 'Date', 'func': func_strdd_sma},
# 'DR_spy_PVol': {'ticker': "SPY", 'col_date': "Date", 'func': func_PVol},
# #'DR_qqq_up_pct': {'ticker': 'QQQ', 'col_date': 'Date', 'func': func_upper_pct},
# 'DR_qqq_lower_pct': {'ticker': 'QQQ', 'col_date': 'Date', 'func': func_lower_pct},
# 'DR_qqq_IV_percentile': {'ticker': 'QQQ', 'col_date': 'Date', 'func': func_IV_percentile},
# 'DR_qqq_strdd_sma_change': {'ticker': "QQQ", 'col_date': 'Date', 'func': func_strdd_sma},
# 'DR_qqq_PVol': {'ticker': "QQQ", 'col_date': "Date", 'func': func_PVol},
# #'DR_gld_up_pct': {'ticker': "GLD", 'col_date': 'Date', 'func': func_upper_pct},
# 'DR_gld_lower_pct': {'ticker': 'GLD', 'col_date': 'Date', 'func': func_lower_pct},
# 'DR_gld_IV_percentile': {'ticker': 'GLD', 'col_date': 'Date', 'func': func_IV_percentile},
# 'DR_gld_strdd_sma_change': {'ticker': "GLD", 'col_date': 'Date', 'func': func_strdd_sma},
# 'DR_gld_PVol': {'ticker': "GLD", 'col_date': "Date", 'func': func_PVol},
# #'DR_tlt_up_pct': {'ticker': "TLT", 'col_date': 'Date', 'func': func_upper_pct},
# 'DR_tlt_lower_pct': {'ticker': 'TLT', 'col_date': 'Date', 'func': func_lower_pct},
# 'DR_tlt_IV_percentile': {'ticker': 'TLT', 'col_date': 'Date', 'func': func_IV_percentile},
# 'DR_tlt_strdd_sma_change': {'ticker': "TLT", 'col_date': 'Date', 'func': func_strdd_sma},
# 'DR_tlt_PVol': {'ticker': "TLT", 'col_date': "Date", 'func': func_PVol},
# #'DR_nvda_up_pct': {'ticker': "NVDA", 'col_date': 'Date', 'func': func_upper_pct},
# 'DR_nvda_lower_pct': {'ticker': 'NVDA', 'col_date': 'Date', 'func': func_lower_pct},
# 'DR_nvda_IV_percentile': {'ticker': 'NVDA', 'col_date': 'Date', 'func': func_IV_percentile},
# 'DR_nvda_strdd_sma_change': {'ticker': "NVDA", 'col_date': 'Date', 'func': func_strdd_sma},
# 'DR_nvda_PVol': {'ticker': "NVDA", 'col_date': "Date", 'func': func_PVol},
# #'DR_aapl_up_pct': {'ticker': "AAPL", 'col_date': 'Date', 'func': func_upper_pct},
# 'DR_aapl_lower_pct': {'ticker': 'AAPL', 'col_date': 'Date', 'func': func_lower_pct},
# 'DR_aapl_IV_percentile': {'ticker': 'AAPL', 'col_date': 'Date', 'func': func_IV_percentile},
# 'DR_aapl_strdd_sma_change': {'ticker': "AAPL", 'col_date': 'Date', 'func': func_strdd_sma},
# 'DR_aapl_PVol': {'ticker': "AAPL", 'col_date': "Date", 'func': func_PVol},
# },
}
# region imports
from AlgorithmImports import *
from config import general_setting
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.ensemble import RandomForestClassifier
# endregion
# Your New Python File
class Data_Processor:
def __init__(self):
self.data_ = pd.DataFrame()
self.X_PCA = pd.DataFrame()
self.X_selected = pd.DataFrame()
self.Y = pd.DataFrame()
self.X_test = pd.DataFrame()
self.X_test_PCA = pd.DataFrame()
self.X_test_selected = pd.DataFrame()
self.scalar = StandardScaler()
self.pca = PCA(n_components='mle', svd_solver='auto')
self.pca_to_keep = 0
self.cols_X_PCA = []
self.features = []
self.processed = False
#%% def process_data()
def process_data(self, data_dict_list, algorithm, predict='False'):
if len(data_dict_list) < general_setting['training_lookback']+30:
return -1
self.store_data(data_dict_list, algorithm, predict=predict)
if predict == 'True':
X_test_standardized = self.scalar.transform(self.X_test)
X_test_PCA = self.pca.transform(X_test_standardized)
X_test_PCA = X_test_PCA[:,:self.pca_to_keep]
X_test_PCA = pd.DataFrame(X_test_PCA, columns=self.cols_X_PCA)
self.X_test_PCA = X_test_PCA
self.X_test_selected = self.X_test_PCA[self.features]
return
self.PCA_analysis(algorithm)
self.parameter_selection(algorithm)
return 1
#%% def store_data(): To process the data for model input and output
def store_data(self, data_dict_list, algorithm, predict='False'):
df = pd.DataFrame(data_dict_list).ffill()
df = df[-general_setting['training_lookback']+30:].copy()
df = df.set_index(['date'], drop=True)
close_cols = []
open_cols = []
indicator_cols = []
DR_cols = []
Y_col = ''
for x in list(df.columns):
if x.startswith('Close'):
close_cols.append(x)
elif x.startswith('Open'):
open_cols.append(x)
elif x.startswith('I'):
indicator_cols.append(x)
elif x.startswith('DR'):
DR_cols.append(x)
elif x.startswith('Y'):
Y_col = x
# Daily and Overnight variable
for col in open_cols:
var = col[5:]
close_col = [col for col in close_cols if col.endswith(var)][0]
df['%_'+var+'_d'] = (df[col] - df[col].shift(1)) / df[col].shift(1)
df['%_'+var+'_night'] = (df[col] - df[close_col].shift(1)) / df[close_col].shift(1)
# Historical/lagging variables
for col in open_cols:
var = col[5:]
df['%_'+var+'_MA9'] = df['%_'+var+'_d'].rolling(9).mean()
df['%_'+var+'_MA21'] = df['%_'+var+'_d'].rolling(21).mean()
df['%_'+var+'_MA30'] = df['%_'+var+'_d'].rolling(30).mean()
df = df.drop(columns = close_cols)
df = df.drop(columns = open_cols)
Y_var = Y_col[2:]
df['%_'+Y_var] = (df[Y_col].shift(-1) - df[Y_col]) / df[Y_col]
# Output classes classification
df.loc[df['%_'+Y_var] >= algorithm.vx_up_threshold, 'signal'] = 1
df.loc[df['%_'+Y_var] <= algorithm.vx_down_threshold, 'signal'] = -1
df.loc[(df['%_'+Y_var] < algorithm.vx_up_threshold) & (df['%_'+Y_var] > algorithm.vx_down_threshold), 'signal'] = 0
df = df.drop(columns=[Y_col, '%_'+Y_var])
if predict == 'True':
cols_X = list(df.columns)
cols_X.remove('signal')
df_test = df[cols_X]
self.X_test = df_test.iloc[-1:, :]
return
df = df.dropna(axis=0)
self.data_ = df
algorithm.debug(F'df: {df}')
algorithm.debug(f'columns: {self.data_.columns}')
algorithm.debug(F'{algorithm.Time}: Store data ({len(self.data_)},{len(list(self.data_.columns))})')
#%% def PCA_analysis(): Perform PCA, only keep 90% variance, and name the columns
# Everytime it runs, it sets self.pca_to_keep, self.cols_X_PCA.
def PCA_analysis(self, algorithm):
cols_X = list(self.data_.columns)
cols_X.remove('signal')
X = self.data_[cols_X]
self.Y = self.data_[['signal']]
self.scalar.fit(X)
X_standardized = self.scalar.transform(X)
self.pca.fit(X_standardized)
X_pca = self.pca.transform(X_standardized)
# Kaiser's criterion
eigenvalues = self.pca.explained_variance_
self.pca_to_keep = 0
for i in range(len(eigenvalues)):
if eigenvalues[i] < 1:
self.pca_to_keep = i
break
X_pca = X_pca[:,:self.pca_to_keep]
self.cols_X_PCA = [f'col_{i}' for i in range(len(X_pca[0]))]
X_pca = pd.DataFrame(X_pca, columns=self.cols_X_PCA, index = self.data_.index)
self.X_PCA = X_pca
algorithm.debug(f'{algorithm.Time}: PCA ({len(self.X_PCA)},{len(list(self.X_PCA.columns))})')
#%% def parameter_selection(): feature selection using Random Forest, keep 90% explaning power
# Every time it runs, it sets self.features
def parameter_selection(self, algorithm, method = 'Random_Forest'):
cols_X = list(self.X_PCA.columns)
clf = RandomForestClassifier()
clf.fit(self.X_PCA, self.Y)
x = pd.Series(clf.feature_importances_, cols_X)
x = x.sort_values(ascending=False)
x_cum = x.cumsum()
features = []
for key, value in x_cum.items():
if value < general_setting['feature_importance_threshold']:
features.append(key)
self.features = features
self.X_selected = self.X_PCA[self.features]
x['Time'] = algorithm.Time.date()
algorithm.feature_importance_container.append(x.to_dict())
self.processed = True
algorithm.debug(f'{algorithm.Time}, selected features ({len(self.features)}/{len(cols_X)}): {self.features}')
#%% def has_data()
def has_data(self):
if len(self.X_selected) > 0 and len(self.Y) > 0 and len(self.X_selected) == len(self.Y):
return True
else:
return False
# region imports from AlgorithmImports import * # endregion #cloud-id: 19880132 #label: volatility
# region imports
from AlgorithmImports import *
from data_processor import *
from QuantConnect.DataSource import *
from QuantConnect.Python import PythonQuandl
from pandas.tseries.offsets import BDay
from arch import arch_model
import pickle
from datetime import datetime
from config import *
from models import *
import numpy as np
import random
import pickle
# endregion
class AlertOrangeFox(QCAlgorithm):
#%% def initialize()
def initialize(self):
self.debug(f'{self.project_id}')
self.set_start_date(2024, 1, 1)
self.set_end_date(2024, 12, 1)
#self.set_start_date(2024, 11, 1)
self.set_warm_up(int(60*8*general_setting['training_lookback']), Resolution.MINUTE)
self.set_cash(100000)
self.random_seed = 60
self.ticker_symbol_map = {}
self.symbol_ticker_map = {}
self.daily_range_items = []
for ticker in general_setting['data_']:
if general_setting['data_'][ticker]['type'] == 'index':
sym = self.add_index(ticker, Resolution.MINUTE).symbol
self.ticker_symbol_map[ticker] = sym
self.symbol_ticker_map[sym] = ticker
elif general_setting['data_'][ticker]['type'] == 'equity':
sym = self.add_equity(ticker, Resolution.MINUTE).symbol
self.ticker_symbol_map[ticker] = sym
self.symbol_ticker_map['sym'] = ticker
elif general_setting['data_'][ticker]['type'] == 'data':
sym = self.add_data(general_setting['data_'][ticker]['source'], ticker, Resolution.DAILY).symbol
self.ticker_symbol_map[ticker] = sym
self.symbol_ticker_map[sym] = ticker
elif general_setting['data_'][ticker]['type'] == 'future':
if ticker == 'VX':
self.VX = self.AddFuture(general_setting['data_'][ticker]['source'], Resolution.MINUTE, dataNormalizationMode=DataNormalizationMode.BackwardsRatio, dataMappingMode=DataMappingMode.LAST_TRADING_DAY, extendedMarketHours=True)
self.VX_sym = self.VX.symbol
self.ticker_symbol_map[ticker] = self.VX_sym
self.symbol_ticker_map[self.VX_sym] = ticker
else:
future = self.AddFuture(general_setting['data_'][ticker]['source'], Resolution.MINUTE, dataNormalizationMode=DataNormalizationMode.BackwardsRatio, dataMappingMode=DataMappingMode.OpenInterest, extendedMarketHours=True)
future_sym = future.symbol
self.ticker_symbol_map[ticker] = future_sym
self.symbol_ticker_map[future_sym] = ticker
elif general_setting['data_'][ticker]['type'] == 'external':
self.ticker_symbol_map[ticker] = ticker
self.symbol_ticker_map[ticker] = ticker
# Bollinger Band
self.SPX_bb = self.bb(self.ticker_symbol_map['SPX'], 30, 2, resolution=Resolution.MINUTE)
self.GC_bb = self.bb(self.ticker_symbol_map['GC'], 30, 2, resolution=Resolution.MINUTE)
self.models = {}
self.initialize_models()
self.prediction = 0
self.reinvested = False
self.morning_attempt = True
self.data_dict_list = []
self.data_dict = {}
self.Schedule.On(self.DateRules.Every([1]), self.TimeRules.At(2, 0), self.train_ML_model)
self.TP_level = general_setting['TP_level']
self.SL_level = general_setting['SL_level']
self.message = ""
# Historical Daily Range data
self.DR_data_dict = {}
self.DR_data = pickle.loads(self.object_store.read_bytes('Historical_Daily_Range/HDR_historical.pkl'))
DR_tickers = list(set(self.DR_data['ticker']))
for ticker in DR_tickers:
if ticker not in self.ticker_symbol_map:
self.add_equity(ticker, Resolution.HOUR)
df = self.DR_data[self.DR_data.ticker == ticker]
df = df.set_index(['Date'])
df.index = pd.to_datetime(df.index)
self.DR_data_dict[ticker] = df
general_setting['DR_data'] = get_DR_items(DR_tickers_selected)
for item in general_setting['DR_data']:
self.daily_range_items.append(item)
self.predictions_count = {0: 0, 1: 0, -1:0} #{1: 0, -1: 0, 0: 0, 2: 0, -2:0}
self.value_to_plot = 0
self.report_container = {}
for signal in ['1', '0', '-1']: #['2', '1', '0', '-1', '-2']
self.report_container[signal] = {1:0, 0:0, -1:0}#{2:0, 1:0, 0:0, -1:0, -2:0}
self.feature_importance_container = []
self.signal_probabilities_container = []
self.threshold_container = []
self.PCA_eigenvalue_container = []
self.webhook_1 = "https://hooks.slack.com/services/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"
self.webhook_2 = 'https://discord.com/api/webhooks/1309186566563823696/EyQyyH3Ky6yKiCWbOsYFVKWdOhm6aYAgeT1swx_TLuJkSf2DnbjFTuEG_BY93z8xAWzU'
self.data_processor_ = Data_Processor()
self.model_rankings = {}
self.chosen_models = {}
self.passive_holding_days = 0
#%% def initialize_models()
def initialize_models(self):
# Garch Model
# SPX_history = self.history(self.SPX_sym, 100, Resolution.HOUR)[:-1]
# SPX_history = SPX_history.droplevel(['symbol'])
# returns = np.log(SPX_history['close'] / SPX_history['close'].shift(1))
# returns = returns.dropna()
# self.models['GARCH'] = GARCH_model(returns, self)
self.models['Random_Forest'] = Random_Forest_model()
self.models['XGBoost'] = XGBoost_model()
self.models['SVM'] = SVM_model()
self.models['KNN'] = KNN_model()
#%% def train_ML_model()
def train_ML_model(self):
# calculate VX thresholds and TP/SL levels
if len(self.data_dict_list) >= 60:
df = pd.DataFrame(self.data_dict_list[-50:])[['Y_VX']]
df['pct'] = df['Y_VX'] / df['Y_VX'].shift() - 1
df = df.dropna(axis=0)
pct_std = df['pct'].std()
self.vx_up_threshold = np.quantile(df.loc[df.pct>0, 'pct'], general_setting['vix_up_quantile'])
self.vx_down_threshold = np.quantile(df.loc[df.pct<0, 'pct'], general_setting['vix_down_quantile'])
self.threshold_container.append({'Time': self.Time.date(), 'UP_threshold': self.vx_up_threshold, 'DOWN_threshold': self.vx_down_threshold})
self.quantile_70 = np.quantile(df['pct'], 0.7)
self.quantile_30 = np.quantile(df['pct'], 0.3)
if self.quantile_70 > 0 and self.quantile_30 < 0:
self.up_down_ratio = self.quantile_70 / -self.quantile_30
self.up_down_ratio = min(2, self.up_down_ratio) if self.up_down_ratio > 1 else max(0.5, self.up_down_ratio)
elif self.quantile_70 < 0:
self.up_down_ratio = 0.5
else:
self.up_down_ratio = 2
#self.debug(f'vx_up_threshold={round(self.vx_up_threshold,3)}, vx_down_threshold={round(self.vx_down_threshold,3)}, up_down_ratio={round(self.up_down_ratio,3)}')
status = self.data_processor_.process_data(self.data_dict_list, self, predict="False")
if status == -1:
return
self.chosen_models = {}
self.model_rankings = {}
if self.data_processor_.has_data():
for model in self.models:
self.models[model].train_model(self.data_processor_.X_selected, self.data_processor_.Y, self)
self.model_rankings[model] = self.models[model].f1_score_training
self.debug(f'\n{model} Training on {self.Time}')
self.model_rankings = dict(sorted(self.model_rankings.items(), key=lambda x: x[1], reverse=True))
self.debug(f'model rankings: {self.model_rankings}')
for i in range(general_setting['n_champions']):
if i == 0:
self.chosen_models[list(self.model_rankings.keys())[i]] = self.model_rankings[list(self.model_rankings.keys())[i]]
elif self.model_rankings[list(self.model_rankings.keys())[i]] > general_setting['min_model_score']:
self.chosen_models[list(self.model_rankings.keys())[i]] = self.model_rankings[list(self.model_rankings.keys())[i]]
total_score = sum(list(self.chosen_models.items())[i][1] for i in range(len(self.chosen_models)))
for model in self.chosen_models:
self.chosen_models[model] = self.chosen_models[model] / total_score
self.debug(f'selected {model} with score {self.chosen_models[model]}')
# model_rankings_list = list(self.model_rankings.items())
# for i in range(general_setting['n_champions']):
# self.chosen_models[model_rankings_list[i][0]] = model_rankings_list[i][1]/total_score
# self.debug(f'selected {model_rankings_list[i][0]} with score {model_rankings_list[i][1]/total_score}')
#%% def fetch_data(): Get the data that are available in the slice.
def fetch_data(self, data, for_close = False):
if for_close and self.data_dict == {}:
return
if not for_close:
self.data_dict = {}
self.data_dict['date'] = self.Time.date()
prefix = 'Open_' if not for_close else 'Close_'
for ticker, sym in self.ticker_symbol_map.items():
# Treasury yield data
if general_setting['data_'][ticker]['type'] == "external" or general_setting['data_'][ticker]['type'] == 'data':
if ticker == "10_year_yield":
data_str = self.download(general_setting['data_']["10_year_yield"]['source'])
data_str_list = data_str.split('\n')
columns = []
rows = []
top_row = True
for row in data_str_list:
content = row.split(',')
if top_row:
columns = content
top_row = False
else:
rows.append(content)
data_df = pd.DataFrame(rows, columns=columns)
data_df = data_df.dropna(axis=0)
data_df = data_df.set_index(['Date'])
data_df.index = pd.to_datetime(data_df.index)
data_df = data_df.sort_index()
tenyear_yield = float(data_df[data_df.index.date < self.Time.date()].iloc[-1]['10 Yr'])
self.data_dict['I_r10'] = tenyear_yield
elif general_setting['data_'][ticker]['type'] == 'data':
if ticker == "VVIX":
vvix_value = list(self.history(sym, 10, Resolution.DAILY)['close'])[-1]
if date == self.data_dict['date']:
self.data_dict[prefix+ticker] = vvix_value
if ticker == 'USTYCR' and not for_close:
yield_curve_symbol = self.ticker_symbol_map[ticker]
df = self.history(USTreasuryYieldCurveRate, yield_curve_symbol, 10, Resolution.DAILY)
#data_dict['Close_r10'] = df[-1:]['tenyear'][0]
self.data_dict['I_r10'] = df[-1:]['tenyear'][0]
self.data_dict['I_Rinversion'] = df[-1:]['twoyear'][0] - df[-1:]['tenyear'][0]
continue
# Other data
if data.contains_key(sym):
value = data[sym].close
elif not self.history(sym, 20, Resolution.MINUTE).empty:
value = list(self.history(sym, 20, Resolution.MINUTE)['close'])[-1]
else:
self.debug(f'{sym} data is not retrieved! ')
value = float('nan')
# Y variable
if ticker == 'VX':
self.data_dict['Y_'+ticker] = value
self.data_dict[prefix+ticker] = value
# Input variable
else:
self.data_dict[prefix+ticker] = value
# Daily Range data
if not for_close:
for item in self.daily_range_items:
ticker = general_setting['DR_data'][item]['ticker']
df = self.DR_data_dict[ticker]
df_today = df[df.index.date == self.Time.date()]
value = general_setting['DR_data'][item]['func'](df_today).values
if len(value) == 0:
self.debug(f'{self.Time}: {item} missing')
df = df[df.index.date < self.Time.date()][-1:]
value = general_setting['DR_data'][item]['func'](df).values[0]
else:
value = value[0]
if type(value) == np.ndarray:
self.debug(F'{self.Time} {item}, value={value}')
self.data_dict[item] = round(value, 3)
# Indicators
bb_width_SPX = self.SPX_bb.upper_band.current.value - self.SPX_bb.lower_band.current.value
bb_width_GLD = self.GC_bb.upper_band.current.value - self.GC_bb.lower_band.current.value
self.data_dict['I_SPXbb'] = bb_width_SPX if bb_width_SPX != 0 else float('nan')
self.data_dict['I_GCbb'] = bb_width_GLD if bb_width_GLD != 0 else float('nan')
if for_close:
self.data_dict_list.append(self.data_dict)
#%% def on_data()
def on_data(self, data: Slice):
# Rollover
for symbol, changed_event in data.symbol_changed_events.items():
old_symbol = changed_event.old_symbol
new_symbol = changed_event.new_symbol
tag = f"Rollover - Symbol changed at {self.time}: {old_symbol} -> {new_symbol}"
if old_symbol not in self.portfolio:
continue
quantity = self.portfolio[old_symbol].quantity
self.liquidate(old_symbol, tag=tag)
if quantity != 0: self.market_order(new_symbol, quantity, tag=tag)
# Get real-time statistics for VIX future and portfolio.
self.VX_contract = self.Securities[self.VX.mapped]
self.price_VX = self.Portfolio[self.VX_contract.symbol].Price
self.position_VX = self.Portfolio[self.VX_contract.symbol].Quantity
# 9:35 am: Fetch open data
time_for_data = (self.Time.hour == 9 and self.Time.minute == 35)
if time_for_data:
if not self.is_warming_up:
self.generate_report(self.price_VX)
self.fetch_data(data, for_close=False)
# 3:35 pm: Fetch close data
time_for_close_data = (self.Time.hour == 15 and self.Time.minute == 55)
if time_for_close_data:
self.fetch_data(data, for_close=True)
# # Intraday data
# if data.ContainsKey("SPX") and self.Time.hour == 9 and self.Time.minute == 35:
# spx_price = data["SPX"].Price
# chain = data.option_chains.get(self.ticker_option_map['SPX'].symbol)
# if chain:
# calls = [i for i in chain if i.Right == OptionRight.CALL]
# min_diff = min([abs(c.strike - spx_price) for c in calls])
# calls_ATM = [c for c in calls if abs(c.strike-spx_price) == min_diff]
# iv_call = np.mean([c.implied_volatility for c in calls_ATM])
# puts = [i for i in chain if i.Right == OptionRight.PUT]
# min_diff = min([abs(p.strike - spx_price) for p in puts])
# puts_ATM = [p for p in puts if abs(p.strike-spx_price) == min_diff]
# iv_put = np.mean([p.implied_volatility for p in puts_ATM])
# iv = (iv_call + iv_put) / 2 * 100
# vix_price = data["VIX"].price
# spx_HV = self.calculate_HV("SPX")
# spread = vix_price - spx_HV
# other_data_dict = {"Date": self.Time.date(), "VIX": vix_price, "SPX_IV": iv, "SPX_HV": spx_HV, "spread": spread}
# self.other_data_dict_ls.append(other_data_dict)
# if len(self.other_data_dict_ls) < 250:
# return
# df = pd.DataFrame(self.other_data_dict_ls[-250:])
# min_iv = min(df['SPX_IV'])
# max_iv = max(df['SPX_IV'])
# iv_rank = (iv - min_iv) / (max_iv - min_iv)
# iv_percentile = (df['SPX_IV'] < iv).mean()
if self.is_warming_up:
return
time_for_entry = (self.Time.hour == 9 and self.Time.minute == 40)
if time_for_entry:
obj = {"text": f"Volatility Morning Briefing ({self.Time.date()}): prediction = {self.prediction}"}
obj = json.dumps(obj)
self.Notify.web(self.webhook_1, obj)
self.Notify.web(self.webhook_2, obj)
self.data_processor_.process_data(self.data_dict_list, self, predict='True')
# Get 1) prediction, for trade direction, and 2) confidence score, for bet size
output_probabilities = {1: 0, 0: 0, -1: 0}
prob_up = 0
prob_flat = 0
prob_down = 0
for model in self.models:
if model in self.chosen_models:
prediction = self.models[model].make_prediction(self.data_processor_.X_test_selected, self)
probability_cone = self.models[model].pred_proba_dict
confidence = self.chosen_models[model]
output_probabilities[1] += probability_cone[1] * (self.chosen_models[model])
output_probabilities[0] += probability_cone[0] * (self.chosen_models[model])
output_probabilities[-1] += probability_cone[-1] * (self.chosen_models[model])
self.bet_size = sorted(list(output_probabilities.items()), key = lambda x: x[1], reverse=True)[0][1]
self.prediction = sorted(list(output_probabilities.items()), key = lambda x: x[1], reverse=True)[0][0] if self.bet_size > general_setting['min_prediction_confidence'] else 0
self.debug(f'{self.Time.date()}, prediction = {self.prediction}, bet size = {self.bet_size}')
self.predictions_count[self.prediction] += 1
self.reinvested = False
if self.prediction == 1:
self.passive_holding_days = 0
if not self.portfolio.invested:
self.entry_price = self.price_VX
self.quantity = self.calculate_order_quantity(self.VX_contract.symbol, self.bet_size)
self.start_value = self.portfolio.total_portfolio_value * self.bet_size
self.message = 'Open Long Volatility position (signal)'
self.debug(F'{self.Time}: {self.message}')
self.set_holdings(self.VX_contract.symbol, self.bet_size, tag='Open Long (signal = 1)')
self.TP_level = general_setting['TP_level'] * self.up_down_ratio
elif self.position_VX < 0:
self.message = 'Closed Short Volatility position (change of signal)'
PnL = round(self.portfolio.total_unrealized_profit / self.start_value*100,2) # need to modify
self.liquidate(tag = f'Close Short (signal = 1) [{PnL}%]')
self.debug(F'{self.Time}: {self.message}')
self.entry_price = self.price_VX
self.quantity = self.calculate_order_quantity(self.VX_contract.symbol, self.bet_size)
self.start_value = self.portfolio.total_portfolio_value * self.bet_size
self.message = 'Open Long Volatility position (signal)'
self.debug(F'{self.Time}: {self.message}')
self.set_holdings(self.VX_contract.symbol, self.bet_size, tag = 'Open Long (signal = 1)')
self.TP_level = general_setting['TP_level'] * self.up_down_ratio
elif self.prediction == -1:
self.passive_holding_days = 0
if not self.portfolio.invested:
self.entry_price = self.price_VX
self.quantity = self.calculate_order_quantity(self.VX_contract.symbol, -self.bet_size)
self.start_value = self.portfolio.total_portfolio_value * self.bet_size
self.message = 'Open Short Volatility position (signal) '
self.debug(F'{self.Time}: {self.message}')
self.set_holdings(self.VX_contract.symbol, -self.bet_size, tag='Open Short (signal = -1)')
self.TP_level = general_setting['TP_level'] / self.up_down_ratio
elif self.position_VX > 0:
self.message = 'Close Long Volatility position (change of signal)'
PnL = round(self.portfolio.total_unrealized_profit / self.start_value*100,2) # need to modify
self.liquidate(tag = f'Close Long (signal = -1) [{PnL}%]')
self.debug(F'{self.Time}: {self.message}')
self.entry_price = self.price_VX
self.quantity = self.calculate_order_quantity(self.VX_contract.symbol, -self.bet_size)
self.start_value = self.portfolio.total_portfolio_value * self.bet_size
self.message = 'Open Short Volatility position (signal)'
self.debug(F'{self.Time}: {self.message}')
self.set_holdings(self.VX_contract.symbol, -self.bet_size, tag = 'Open Short (signal = -1)')
self.TP_level = general_setting['TP_level'] / self.up_down_ratio
elif self.prediction == 0:
if self.portfolio.invested:
self.passive_holding_days += 1
if self.passive_holding_days > 2:
self.liquidate()
# Taking Profit / Stop Loss
if self.portfolio.invested and not time_for_entry:
# Stop Loss
if self.portfolio.total_unrealized_profit / self.start_value <= self.SL_level:
self.message = 'Close Long Volatility position (Stop Loss)' if self.position_VX > 0 else 'Close Short Volatility position (Stop Loss)'
PnL = round(self.portfolio.total_unrealized_profit / self.start_value*100,2)
self.liquidate(asynchronous=True, tag = f'Close (STOP LOSS) [{PnL}%]')
self.debug(F'{self.Time}: {self.message}')
self.bet_size = self.bet_size * 0.5
# Flip to short
if self.position_VX > 0 and not self.reinvested:
self.entry_price = self.price_VX
self.quantity = self.calculate_order_quantity(self.VX_contract.symbol, self.bet_size)
self.start_value = self.portfolio.total_portfolio_value * self.bet_size
self.passive_holding_days = 0
self.message = 'Reinvest in Short Volatility position (after SL)'
self.debug(F'{self.Time}: {self.message}')
self.set_holdings(self.VX_contract.symbol, -self.bet_size, tag = 'Flip to short')
self.reinvested = True
self.TP_level = general_setting['TP_level'] / self.up_down_ratio
# Flip to long
elif self.position_VX < 0 and not self.reinvested:
self.entry_price = self.price_VX
self.quantity = self.calculate_order_quantity(self.VX_contract.symbol, self.bet_size)
self.start_value = self.portfolio.total_portfolio_value * self.bet_size
self.passive_holding_days = 0
self.message = 'Reinvest in Long Volatility position (after SL)'
self.debug(F'{self.Time}: {self.message}')
self.set_holdings(self.VX_contract.symbol, self.bet_size, tag = 'Flip to long')
self.reinvested = True
self.TP_level = general_setting['TP_level'] * self.up_down_ratio
# Take Profit
elif self.portfolio.total_unrealized_profit / self.start_value >= self.TP_level:
self.message = 'Close Long Volatility position (Take Profit)' if self.position_VX > 0 else 'Close Short Volatility position (Take Profit)'
PnL = round(self.portfolio.total_unrealized_profit / self.start_value*100,2)
self.liquidate(asynchronous=True, tag = f'Close (TAKE PROFIT) [{PnL}%]')
if self.position_VX > 0:
self.entry_price = self.price_VX
self.quantity = self.calculate_order_quantity(self.VX_contract.symbol, self.bet_size)
self.start_value = self.portfolio.total_portfolio_value * self.bet_size
self.passive_holding_days = 0
self.message = 'Reinvest in Long Volatility position (after TP)'
self.debug(F'{self.Time}: {self.message}')
self.set_holdings(self.VX_contract.symbol, self.bet_size, tag='Consecutive Long entry')
self.TP_level = general_setting['TP_level'] * self.up_down_ratio
elif self.position_VX < 0:
self.entry_price = self.price_VX
self.quantity = self.calculate_order_quantity(self.VX_contract.symbol, self.bet_size)
self.start_value = self.portfolio.total_portfolio_value * self.bet_size
self.passive_holding_days = 0
self.message = 'Reinvest in Short Volatility position (after TP)'
self.debug(F'{self.Time}: {self.message}')
self.set_holdings(self.VX_contract.symbol, -self.bet_size, tag='Consecutive Short entry')
self.TP_level = general_setting['TP_level'] / self.up_down_ratio
#%% def on_order_event(): fire webhooks
def on_order_event(self, order_event: OrderEvent) -> None:
order = self.transactions.get_order_by_id(order_event.order_id)
symbol = order_event.symbol
fill_price = order_event.fill_price
fill_quantity = order_event.fill_quantity
direction = order_event.direction
date = self.Time.date()
hour = self.Time.hour
minute = self.Time.minute
second = str(self.Time.second)
if order_event.status == OrderStatus.FILLED or order_event.status == OrderStatus.PARTIALLY_FILLED:
obj = {"text": f"{self.message}\n<PAPER MONEY> Time: {date} {hour}:{minute}:{second.zfill(2)}, Symbol: {symbol}, Quantity: {fill_quantity}, Price: {fill_price}"}
obj = json.dumps(obj)
self.Notify.web(self.webhook_1, obj)
self.Notify.web(self.webhook_2, obj)
#%% def generate_report()
def generate_report(self, price_VX):
last_VX = self.data_dict_list[-1]['Y_VX']
pct = price_VX / last_VX - 1
if pct >= self.vx_up_threshold:
actual_signal = 1
elif pct <= self.vx_down_threshold:
actual_signal = -1
else:
actual_signal = 0
# elif pct >= -self.vx_flat_threshold and pct <= self.vx_flat_threshold:
# actual_signal = 0
# elif pct > self.vx_down_threshold and pct < -self.vx_flat_threshold:
# actual_signal = -1
# else:
# actual_signal = 1
# if self.prediction == 1:
# self.report_container['1'][actual_signal] += 1
# elif self.prediction == -1:
# self.report_container['-1'][actual_signal] += 1
if self.prediction == 0:
self.report_container['0'][actual_signal] += 1
elif self.prediction == 1:
self.report_container['1'][actual_signal] += 1
elif self.prediction == -1:
self.report_container['-1'][actual_signal] += 1
value_to_plot = 1 if actual_signal == self.prediction else -1
#%% def on_end_of_algorithm
def on_end_of_algorithm(self):
for key, dictt in self.report_container.items():
sum_cases = sum(dictt.values())
self.debug(F'For {key} signal: {dictt[1]}/{sum_cases} are 1, {dictt[0]}/{sum_cases} are 0, {dictt[-1]}/{sum_cases} are -1')
for key, value in self.predictions_count.items():
self.debug(f'There are {value} cases for {key} signal')
self.feature_importance_df = pd.DataFrame(self.feature_importance_container)
self.signal_probabilities_df = pd.DataFrame(self.signal_probabilities_container)
self.threshold_df = pd.DataFrame(self.threshold_container)
self.ObjectStore.SaveBytes("19880132/Feature_Importance.pkl", pickle.dumps(self.feature_importance_df))
self.ObjectStore.SaveBytes('19880132/Signal_Probabilities.pkl', pickle.dumps(self.signal_probabilities_df))
self.ObjectStore.SaveBytes('19880132/Thresholds.pkl',pickle.dumps(self.threshold_df))
self.debug(F'feature_importance: {self.feature_importance_df}')
self.debug(F'signal probability: {self.signal_probabilities_df}')
self.debug(F'threshold: {self.threshold_df}')
# region imports
from AlgorithmImports import *
from arch import arch_model
import pandas as pd
import random
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split, KFold, cross_validate, GridSearchCV, cross_val_score, StratifiedKFold
from sklearn import tree
from config import *
from sklearn.feature_selection import RFECV
from bayes_opt import BayesianOptimization
from sklearn.metrics import f1_score
from xgboost import XGBClassifier
from sklearn.preprocessing import LabelEncoder
from sklearn.neighbors import KNeighborsClassifier
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
import tensorflow as tf
from tensorflow.keras.callbacks import EarlyStopping
# endregion
# Your New Python File
class ML_model:
def __init__(self):
pass
def evaluate(self, model, test_features, test_labels):
pass
def objective_function(self, n_estimators, max_depth, min_samples_split, min_samples_leaf):
pass
def train_model(self, data_X, data_Y, algorithm):
pass
def make_prediction(self, X_test, algorithm):
pass
#%% GARCH_model
class GARCH_model:
def __init__(self, data, algorithm):
self.model = arch_model(data, mean = 'AR', vol = 'Garch', p = 2, q = 2, dist = 'Normal')
self.model = self.model.fit(update_freq = 5, disp='off')
a = self.model.forecast(horizon=5)
def train(self, data):
self.model = arch_model(data, vol = 'Garch', p = 5, q = 5, dist = 'Normal')
self.model = self.model.fit(update_freq = 5, disp='off')
def predict_variance(self, h):
var_curr = self.model.conditional_volatility.iloc[-1,0 ]
result = self.model.forecast(horizon=h)
var_hat_1 = result.variance.iloc[0, 0]
var_hat_2 = result.variance.iloc[0, 1]
var_hat_3 = result.variance.iloc[0, 2]
return result.variance.iloc[0, :]
#%% Random_Forest_model()
class Random_Forest_model(ML_model):
def __init__(self):
self.model = RandomForestClassifier()
#self.model = tree.DecisionTreeClassifier()
self.X_train = pd.DataFrame()
self.Y_train = pd.DataFrame()
self.X_test = pd.DataFrame()
self.features = []
self.is_ready = False
self.f1_score_training = 0
self.param_bounds = {
'n_estimators': (10, 100), # depending on the sample size (30, 30)
'max_depth': (5, 50), # None
'min_samples_split': (2, 20), # (2, 5)
'min_samples_leaf': (1, 20), # (1, 50) depeneding on the sample size
'random_state': (1, 100),
}
self.prediction = 0
self.pred_proba_dict = {}
def objective_function(self, n_estimators, max_depth, min_samples_split, min_samples_leaf, random_state):
model = RandomForestClassifier(
n_estimators=int(n_estimators),
max_depth=int(max_depth),
min_samples_split=int(min_samples_split),
min_samples_leaf=int(min_samples_leaf),
random_state=int(random_state),
)
kfold = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
scores = cross_val_score(model, self.X_train, self.Y_train, cv=kfold, scoring='f1_weighted')
return scores.mean()
def train_model(self, data_X, data_Y, algorithm):
self.X_train = data_X
self.Y_train = data_Y
# Hyperparameter Tuning
optimizer = BayesianOptimization(
f=self.objective_function,
pbounds=self.param_bounds,
random_state=42,
verbose=2
)
optimizer.maximize(init_points=10, n_iter=30)
best_params = optimizer.max['params']
#algorithm.debug(F'{optimizer.max}')
# Evaluate the model
X_train = self.X_train.iloc[:int(len(self.X_train)*general_setting['train_test_split_ratio']), :]
Y_train = self.Y_train.iloc[:int(len(self.Y_train)*general_setting['train_test_split_ratio']), :]
X_test = self.X_train.iloc[int(len(self.X_train)*general_setting['train_test_split_ratio']):, :]
Y_test = self.Y_train.iloc[int(len(self.Y_train)*general_setting['train_test_split_ratio']):, :]
self.model = RandomForestClassifier(max_depth=int(best_params['max_depth']), min_samples_leaf=int(best_params['min_samples_leaf']),
min_samples_split=int(best_params['min_samples_split']), n_estimators=int(best_params['n_estimators']),
random_state=int(best_params['random_state']))
self.model.fit(X_train, Y_train)
self.f1_score_training = f1_score(Y_test, self.model.predict(X_test), average='weighted')
algorithm.plot('f1_score_training', 'Random_Forest', self.f1_score_training)
# Train the model
self.model.fit(self.X_train, self.Y_train)
self.is_ready = True
return
def make_prediction(self, X_test, algorithm):
if not self.is_ready:
algorithm.debug(F'{algorithm.Time}: model is not ready')
return
self.X_test = X_test
predicted_signal = self.model.predict(self.X_test)[0]
classes = self.model.classes_
pred_proba = self.model.predict_proba(self.X_test)[0]
pred_proba_dict = {int(classes[i]): pred_proba[i] for i in range(len(classes))}
self.pred_proba_dict = pred_proba_dict
self.prediction = predicted_signal
pred_proba_dict['Time'] = algorithm.Time.date()
algorithm.signal_probabilities_container.append(pred_proba_dict)
return predicted_signal
#%% XGBoost
class XGBoost_model(ML_model):
def __init__(self):
self.model = XGBClassifier()
#self.model = tree.DecisionTreeClassifier()
self.X_train = pd.DataFrame()
self.Y_train = pd.DataFrame()
self.X_test = pd.DataFrame()
self.features = []
self.is_ready = False
self.f1_score_training = 0
self.param_bounds = {
'max_depth': (3, 10),
'learning_rate': (0.01, 0.3),
'n_estimators': (50, 300),
'gamma': (0, 5),
'min_child_weight': (1, 10),
'subsample': (0.5, 1),
'colsample_bytree': (0.5 ,1),
'random_seed': (1, 100),
}
self.prediction = 0
self.pred_proba_dict = {}
self.label_encoder = LabelEncoder()
def objective_function(self, max_depth, learning_rate, n_estimators, gamma, min_child_weight, subsample, colsample_bytree, random_seed):
# Initialize the model with current hyperparameters
model = XGBClassifier(
max_depth=int(max_depth),
learning_rate=learning_rate,
n_estimators=int(n_estimators),
gamma=gamma,
min_child_weight=min_child_weight,
subsample=subsample,
colsample_bytree=colsample_bytree,
use_label_encoder=False,
eval_metric='mlogloss',
random_seed = int(random_seed),
)
# Perform cross-validation
kfold = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
scores = cross_val_score(model, self.X_train, self.Y_train, scoring='f1_weighted', cv=kfold)
# Return the mean accuracy as the optimization target
return scores.mean()
def train_model(self, data_X, data_Y, algorithm):
self.X_train = data_X
self.Y_train = pd.DataFrame(self.label_encoder.fit_transform(data_Y), index=self.X_train.index)
# Hyperparameter Tuning
optimizer = BayesianOptimization(
f=self.objective_function,
pbounds=self.param_bounds,
random_state=42,
verbose=2
)
optimizer.maximize(init_points=10, n_iter=30)
best_params = optimizer.max['params']
#algorithm.debug(F'{optimizer.max}')
# Evaluate the model
X_train = self.X_train.iloc[:int(len(self.X_train)*general_setting['train_test_split_ratio']), :]
Y_train = self.Y_train.iloc[:int(len(self.Y_train)*general_setting['train_test_split_ratio']), :]
X_test = self.X_train.iloc[int(len(self.X_train)*general_setting['train_test_split_ratio']):, :]
Y_test = self.Y_train.iloc[int(len(self.Y_train)*general_setting['train_test_split_ratio']):, :]
self.model = XGBClassifier(max_depth=int(best_params['max_depth']), learning_rate=best_params['learning_rate'], n_estimators=int(best_params['n_estimators']),
gamma=best_params['gamma'], min_child_weight=best_params['min_child_weight'], subsample=best_params['subsample'],
colsample_bytree=best_params['colsample_bytree'], use_label_encoder=False, eval_metric='mlogloss',
random_seed = int(best_params['random_seed']))
self.model.fit(X_train, Y_train)
self.f1_score_training = f1_score(Y_test, self.model.predict(X_test), average='weighted')
algorithm.plot('f1_score_training', 'XGBoost', self.f1_score_training)
# Train the model
self.model.fit(self.X_train, self.Y_train)
self.is_ready = True
return
# def make_prediction()
def make_prediction(self, X_test, algorithm):
if not self.is_ready:
algorithm.debug(F'{algorithm.Time}: model is not ready')
return
self.X_test = X_test
predicted_signal = self.label_encoder.inverse_transform([self.model.predict(self.X_test)[0]])[0]
classes = [self.label_encoder.inverse_transform([i])[0] for i in self.model.classes_]
pred_proba = self.model.predict_proba(self.X_test)[0]
pred_proba_dict = {int(classes[i]): pred_proba[i] for i in range(len(classes))}
self.pred_proba_dict = pred_proba_dict
self.prediction = predicted_signal
pred_proba_dict['Time'] = algorithm.Time.date()
algorithm.signal_probabilities_container.append(pred_proba_dict)
return predicted_signal
#%% Support Vector Machine
class SVM_model(ML_model):
def __init__(self):
self.model = SVC()
#self.model = tree.DecisionTreeClassifier()
self.X_train = pd.DataFrame()
self.Y_train = pd.DataFrame()
self.X_test = pd.DataFrame()
self.features = []
self.is_ready = False
self.f1_score_training = 0
# Define the parameter bounds
self.param_bounds = {
'C': (0.1, 10),
'gamma': (0.0001, 1),
'random_state': (1, 100)
}
self.prediction = 0
self.pred_proba_dict = {}
def objective_function(self, C, gamma, random_state):
# Initialize the model with current hyperparameters
model = SVC(C=C, gamma=gamma, kernel='rbf', random_state=int(random_state))
# Perform cross-validation
kfold = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
scores = cross_val_score(model, self.X_train, self.Y_train, scoring='f1_weighted', cv=kfold)
# Return the mean accuracy as the optimization target
return scores.mean()
def train_model(self, data_X, data_Y, algorithm):
self.X_train = data_X
self.Y_train = data_Y
tuner = BayesianOptimization(
f = self.objective_function,
pbounds=self.param_bounds,
random_state=42,
verbose=2
)
tuner.maximize(init_points=10, n_iter=50)
best_params = tuner.max['params']
# Evaluate the model
X_train = self.X_train.iloc[:int(len(self.X_train)*general_setting['train_test_split_ratio']), :]
Y_train = self.Y_train.iloc[:int(len(self.Y_train)*general_setting['train_test_split_ratio']), :]
X_test = self.X_train.iloc[int(len(self.X_train)*general_setting['train_test_split_ratio']):, :]
Y_test = self.Y_train.iloc[int(len(self.Y_train)*general_setting['train_test_split_ratio']):, :]
self.model = SVC(
C=best_params['C'],
gamma=best_params['gamma'],
kernel='rbf',
random_state=int(best_params['random_state']),
probability = True
)
self.model.fit(X_train, Y_train)
self.f1_score_training = f1_score(Y_test, self.model.predict(X_test), average='weighted')
algorithm.plot('f1_score_training', 'SVM', self.f1_score_training)
# Train the model
self.model.fit(self.X_train, self.Y_train)
self.is_ready = True
return
def make_prediction(self, X_test, algorithm):
if not self.is_ready:
algorithm.debug(F'{algorithm.Time}: model is not ready')
return
self.X_test = X_test
predicted_signal = self.model.predict(self.X_test)[0]
classes = self.model.classes_
pred_proba = self.model.predict_proba(self.X_test)[0]
pred_proba_dict = {int(classes[i]): pred_proba[i] for i in range(len(classes))}
self.pred_proba_dict = pred_proba_dict
self.prediction = predicted_signal
pred_proba_dict['Time'] = algorithm.Time.date()
algorithm.signal_probabilities_container.append(pred_proba_dict)
return predicted_signal
#%% K-Nearest Neighbors
class KNN_model(ML_model):
def __init__(self):
self.model = KNeighborsClassifier()
#self.model = tree.DecisionTreeClassifier()
self.X_train = pd.DataFrame()
self.Y_train = pd.DataFrame()
self.X_test = pd.DataFrame()
self.features = []
self.is_ready = False
self.f1_score_training = 0
# Define the parameter bounds
self.param_bounds = {
'n_neighbors': (1, 30),
'p': (1, 2),
}
self.prediction = 0
self.pred_proba_dict = {}
def objective_function(self, n_neighbors, p):
# Initialize the model with current hyperparameters
model = KNeighborsClassifier(n_neighbors=int(n_neighbors), p=p)
# Perform cross-validation
kfold = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
scores = cross_val_score(model, self.X_train, self.Y_train, scoring='f1_weighted', cv=kfold)
# Return the mean accuracy as the optimization target
return scores.mean()
def train_model(self, data_X, data_Y, algorithm):
self.X_train = data_X
self.Y_train = data_Y
tuner = BayesianOptimization(
f = self.objective_function,
pbounds=self.param_bounds,
random_state=42,
verbose=2
)
tuner.maximize(init_points=10, n_iter=50)
best_params = tuner.max['params']
# Evaluate the model
X_train = self.X_train.iloc[:int(len(self.X_train)*general_setting['train_test_split_ratio']), :]
Y_train = self.Y_train.iloc[:int(len(self.Y_train)*general_setting['train_test_split_ratio']), :]
X_test = self.X_train.iloc[int(len(self.X_train)*general_setting['train_test_split_ratio']):, :]
Y_test = self.Y_train.iloc[int(len(self.Y_train)*general_setting['train_test_split_ratio']):, :]
self.model = KNeighborsClassifier(
n_neighbors=int(best_params['n_neighbors']),
p=best_params['p'],
)
self.model.fit(X_train, Y_train)
self.f1_score_training = f1_score(Y_test, self.model.predict(X_test), average='weighted')
algorithm.plot('f1_score_training', 'KNN', self.f1_score_training)
# Traing the model
self.model.fit(self.X_train, self.Y_train)
self.is_ready = True
return
def make_prediction(self, X_test, algorithm):
if not self.is_ready:
algorithm.debug(F'{algorithm.Time}: model is not ready')
return
self.X_test = X_test
predicted_signal = self.model.predict(self.X_test)[0]
classes = self.model.classes_
pred_proba = self.model.predict_proba(self.X_test)[0]
pred_proba_dict = {int(classes[i]): pred_proba[i] for i in range(len(classes))}
self.pred_proba_dict = pred_proba_dict
self.prediction = predicted_signal
pred_proba_dict['Time'] = algorithm.Time.date()
algorithm.signal_probabilities_container.append(pred_proba_dict)
return predicted_signal
#%% FNN
class FNN_model:
def __init__(self):
self.model = None
#self.model = tree.DecisionTreeClassifier()
self.X_train = pd.DataFrame()
self.Y_train = pd.DataFrame()
self.X_test = pd.DataFrame()
self.features = []
self.is_ready = False
self.f1_score_training = 0
self.param_bounds = {
'hidden_units': (10, 50),
'num_layers': (1, 3),
'learning_rate': (0.001, 0.01),
'batch_size': (16, 32),
'epochs': (10, 50),
'random_seed': (1, 100)
}
self.prediction = 0
self.pred_proba_dict = {}
self.early_stopping = EarlyStopping(monitor='val_loss', patience = 5, restore_best_weights=True)
def objective_function(self, hidden_units, num_layers, learning_rate, batch_size, epochs, random_seed):
try:
hidden_units = int(hidden_units)
num_layers = int(num_layers)
batch_size = int(batch_size)
epochs = int(epochs)
# setting seed for reproducability
np.random.seed(int(random_seed))
tf.random.set_seed(int(random_seed))
random.seed(int(random_seed))
# Build the model
model = Sequential()
model.add(Dense(hidden_units, activation='relu', input_shape=(self.X_train.shape[1], )))
for _ in range(num_layers-1):
model.add(Dense(hidden_units, activation='relu'))
model.add(Dense(3, activation='softmax'))
# Compile the model
model.compile(optimizer = Adam(learning_rate=learning_rate), loss='categorical_crossentropy', metrics=['accuracy'])
# train the model
model.fit(self.X_train, self.Y_train, epochs=epochs, batch_size=batch_size, verbose=0, callbacks=[self.early_stopping])
y_pred = model.predict(self.X_train)
y_pred_classes = tf.argmax(y_pred).numpy()
y_train_classes = tf.argmax(self.Y_train).numpy()
score = f1_score(y_train_classes, y_pred_classes, average='weighted')
return score
except Exception as e:
Console.write(f'Error during evaluation: {e}')
return -1.0
def train_model(self, data_X, data_Y, algorithm):
self.X_train = data_X
self.Y_train = tf.keras.utils.to_categorical(data_Y, num_classes=3)
# Hyperparameter Tuning
optimizer = BayesianOptimization(
f=self.objective_function,
pbounds=self.param_bounds,
random_state=42,
verbose=2
)
optimizer.maximize(init_points=10, n_iter=50)
best_params = optimizer.max['params']
# Evaluate the model
np.random.seed(int(best_params['random_seed']))
tf.random.set_seed(int(best_params['random_seed']))
random.seed(int(best_params['random_seed']))
X_train = self.X_train.iloc[:int(len(self.X_train)*general_setting['train_test_split_ratio']), :]
Y_train = self.Y_train[:int(len(self.Y_train)*general_setting['train_test_split_ratio']), :]
X_test = self.X_train.iloc[int(len(self.X_train)*general_setting['train_test_split_ratio']):, :]
Y_test = data_Y.iloc[int(len(self.Y_train)*general_setting['train_test_split_ratio']):, :]
self.model = Sequential()
self.model.add(Dense(int(best_params['hidden_units']), activation='relu', input_shape=(self.X_train.shape[1], )))
for _ in range(int(best_params['num_layers']) - 1):
self.model.add(Dense(int(best_params['hidden_units']), activation='relu'))
self.model.add(Dense(3, activation='softmax'))
self.model.compile(
optimizer=Adam(learning_rate=best_params['learning_rate']),
loss='categorical_crossentropy',
metrics=['accuracy']
)
self.model.fit(
X_train, Y_train,
epochs=int(best_params['epochs']),
batch_size=int(best_params['batch_size']),
verbose=1,
callbacks=[self.early_stopping]
)
y_pred = np.argmax(self.model.predict(X_test), axis=1)
self.f1_score_training = f1_score(Y_test, y_pred, average='weighted')
# Train the model
self.model.fit(self.X_train, self.Y_train)
self.is_ready = True
return
def make_prediction(self, X_test, algorithm):
if not self.is_ready:
algorithm.debug(F'{algorithm.Time}: model is not ready')
return
self.X_test = X_test
predicted_signal = self.model.predict(self.X_test)[0]
algorithm.debug(F'predicted signal: {predicted_signal}')
# classes = self.model.classes_
# pred_proba = self.model.predict_proba(self.X_test)[0]
# pred_proba_dict = {int(classes[i]): pred_proba[i] for i in range(len(classes))}
# self.pred_proba_dict = pred_proba_dict
# self.prediction = predicted_signal
# pred_proba_dict['Time'] = algorithm.Time.date()
# algorithm.signal_probabilities_container.append(pred_proba_dict)
return predicted_signal
# region imports from AlgorithmImports import * # endregion #### discord channel: algo-ops-beta #### https://discord.com/api/webhooks/1309186566563823696/EyQyyH3Ky6yKiCWbOsYFVKWdOhm6aYAgeT1swx_TLuJkSf2DnbjFTuEG_BY93z8xAWzU ## kevin.stoll - 2024.11.22