| Overall Statistics |
|
Total Orders 2502 Average Win 1.52% Average Loss -0.49% Compounding Annual Return 3.881% Drawdown 70.400% Expectancy 0.051 Start Equity 1000000 End Equity 1056172.50 Net Profit 5.617% Sharpe Ratio 0.302 Sortino Ratio 0.426 Probabilistic Sharpe Ratio 17.340% Loss Rate 74% Win Rate 26% Profit-Loss Ratio 3.10 Alpha -0.221 Beta 2.936 Annual Standard Deviation 0.615 Annual Variance 0.378 Information Ratio 0.083 Tracking Error 0.57 Treynor Ratio 0.063 Total Fees $131721.83 Estimated Strategy Capacity $5600000.00 Lowest Capacity Asset BABA VU1EHIDJYJXH Portfolio Turnover 262.91% |
#region imports
from AlgorithmImports import *
#endregion
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import plotly.graph_objects as go
import yfinance as yf
import pandas_ta as ta
from IPython.core.display import display, HTML
from collections import Counter
import time
from datetime import datetime
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.naive_bayes import GaussianNB, MultinomialNB
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier,GradientBoostingClassifier,VotingClassifier,StackingClassifier
from sklearn.svm import LinearSVC
from sklearn.svm import SVC as SupportVectorClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.model_selection import train_test_split,GridSearchCV,cross_val_score,KFold,StratifiedKFold
from sklearn.metrics import accuracy_score,precision_score,recall_score,f1_score,confusion_matrix,roc_curve, roc_auc_score
def symbolShift(df, num_shift, drop_first_shift=True, Label=True , shifted_column='Close'):
X = df[shifted_column]
shifted_dict = {}
shifted_dict[shifted_column] = df[shifted_column]
for i in range(1,num_shift+1):
shifted_series = X.shift(i)
shifted_dict[f'sh{i}'] = shifted_series
X2 = pd.DataFrame(shifted_dict)
if drop_first_shift == True :
X2 = X2[num_shift::]
if Label == True :
X2['Label'] = X2['returns']-X2['sh1']
X2.loc[X2['Label'] < 0 , 'Label'] = 0
X2.loc[X2['Label'] > 0 , 'Label'] = 1
return X2
def InputOutput(dataframe, label, traintest=True, split_size=None):
X = dataframe.drop(label, axis=1)
y = dataframe[label]
if traintest == True:
Xtrain, Xtest, ytrain, ytest = train_test_split(X, y, test_size = split_size)
return Xtrain, Xtest, ytrain, ytest
else:
return X,y
return fig.show()
def slicingTimeseries(X,y,sliceTrain=0.8):
slicTrain = X.shape[0]*sliceTrain
Xtrain = X[0:int(slicTrain)]
Xtest = X[int(slicTrain):]
ytrain = y[0:int(slicTrain)]
ytest = y[int(slicTrain):]
return Xtrain , Xtest , ytrain , ytest
def AddOsil(df):
df['returns'] = df['Close'].pct_change()
df['RSI']=ta.rsi(df.Close, length=15)
df['EMAF']=ta.ema(df.Close, length=20)
df['EMAM']=ta.ema(df.Close, length=100)
df['EMAS']=ta.ema(df.Close, length=150)
return df
def AddLabel(df):
df['Label'] = df['Close']-df['Open']
df.loc[df['Label'] < 0 , 'Label'] = 0
df.loc[df['Label'] > 0 , 'Label'] = 1
df['Label2'] = df['Label'].shift(-1)
return df
def TimeSeriesGen(df , shift):
y = df['Label2']
X = df.drop(['Label','Label2','Open','High','Low'],axis=1)
Z = X.copy()
for i in range(shift):
i = i+1
X = pd.concat([X, Z.shift(i)], axis=1)
return X,y
def backtester(data , model , amount=10000 , fee=0.01 , period=400 , lev=1):
buy = 0
sell = 0
baseAmount = amount
borrow = amount*(lev-1)
amount = amount*lev
for i in range(1,period):
ytomo = model.predict(data.iloc[i-1:i])
if ytomo == 1 :
buy +=1
amount -=fee
amount=amount*((data.iloc[i:i+1].returns.iloc[0][0] * 100) +100)/ 100
elif ytomo == 0 :
sell +=1
amount -=fee
amount=amount*(-(data.iloc[i:i+1].returns.iloc[0][0] * 100) +100)/ 100
#print(amount)
if amount <= borrow:
break
netprof = amount-borrow
return netprof
def Buybacktester(data , amount=10000 , fee=0.01 , period=400 , lev=1):
buy = 0
sell = 0
baseAmount = amount
borrow = amount*(lev-1)
amount = amount*lev
for i in range(1,period):
ytomo =1
if ytomo == 1 :
buy +=1
amount -=fee
amount=amount*((data.iloc[i-1:i].returns.iloc[0][0] * 100) +100)/ 100
elif ytomo == 0 :
sell +=1
amount -=fee
amount=amount*(-(data.iloc[i-1:i].returns.iloc[0][0] * 100) +100)/ 100
#print(amount)
if amount <= borrow:
print('Call Marjin')
break
netprof = amount-borrow
return netprof
def BaselineModels(Xlearn,ylearn,Xval,yval):
Model = {}
Model['GBC'] = {'model': GradientBoostingClassifier()}
Model['LRC'] = {'model': LogisticRegression()}
Model['DTC'] = {'model': DecisionTreeClassifier()}
Model['RFC'] = {'model':RandomForestClassifier()}
Model['KNC'] = {'model': KNeighborsClassifier()} #7
Model['GNB'] = {'model': GaussianNB()}
Model['LSVC'] = {'model': LinearSVC()}
Model['SVC'] = {'model': SupportVectorClassifier()}
Model['MLP'] = {'model': MLPClassifier()}
Model['VOT'] = {'model': VotingClassifier(estimators=[('GBC', Model['GBC']['model']), ('LRC', Model['LRC']['model']), ('GNB', Model['GNB']['model']),
('DTC', Model['DTC']['model']), ('KNC', Model['KNC']['model']), ('RFC', Model['RFC']['model']),
('LSVC', Model['LSVC']['model']), ('SVC', Model['SVC']['model']), ('MLP', Model['MLP']['model'])
],
voting='hard')}
Model['STK'] = {'model': StackingClassifier(estimators=[('GBC', Model['GBC']['model']), ('LRC', Model['LRC']['model']), ('GNB', Model['GNB']['model']),
('DTC', Model['DTC']['model']), ('KNC', Model['KNC']['model']), ('RFC', Model['RFC']['model']),
('LSVC', Model['LSVC']['model']), ('SVC', Model['SVC']['model']), ('MLP', Model['MLP']['model'])
],
final_estimator=LogisticRegression())}
#----------------------------------------------------
counts = Counter(yval)
print('norm yval count:',counts)
NormalizeAcc = counts[1] / (counts[0]+counts[1])
print(NormalizeAcc)
if(NormalizeAcc < 0.5):
NormalizeAcc = 1 - NormalizeAcc
print(NormalizeAcc)
#----------------------------------------------------
for i in Model.keys():
Model[i]['model'].fit(Xlearn, ylearn)
ypred = Model[i]['model'].predict(Xval)
accuracy = accuracy_score(yval, ypred)
precision = precision_score(yval, ypred)
recall = recall_score(yval, ypred)
f1 = f1_score(yval, ypred)
NormalizedAcc = (accuracy * 0.5) / NormalizeAcc
backtest = backtester(Xval ,model=Model[i]['model'] ,period = Xval.shape[0], lev=1)
nnp = Buybacktester(Xval , amount=10000 , fee=0.01 , period=Xval.shape[0] , lev=1)
Model[i]['acc'] = accuracy
Model[i]['backtest'] = backtest
Model[i]['nnp'] = nnp
Model[i]['NormalizedAcc'] = NormalizedAcc
Model[i]['precision'] = precision
Model[i]['recall'] = recall
Model[i]['f1'] = f1
return Model
#--------------------------------------------------------------------------------------------------------
def ParamsModels(Xlearn,ylearn,Xval,yval):
paramModel = {}
paramModel['GBC'] = {'model': GradientBoostingClassifier(n_estimators=100, learning_rate=0.01, max_depth=12)}
paramModel['LRC'] = {'model': LogisticRegression(C=0.1, max_iter=10000, solver='lbfgs')}
paramModel['DTC'] = {'model': DecisionTreeClassifier(max_depth = 12, min_samples_split=6, min_samples_leaf=4)}
paramModel['RFC'] = {'model': RandomForestClassifier(n_estimators = 500, max_depth = 10)}
paramModel['KNC'] = {'model': KNeighborsClassifier(n_neighbors = 7, weights='distance')}
paramModel['GNB'] = {'model': GaussianNB(var_smoothing=1e-09)}
paramModel['LSVC'] = {'model': LinearSVC(C=1 ,max_iter = 10000)}
paramModel['SVC'] = {'model': SupportVectorClassifier(C=1, kernel='rbf', gamma='scale', max_iter=5000)}
paramModel['MLP'] = {'model': MLPClassifier(hidden_layer_sizes = (69),max_iter = 10000 ,activation='relu', alpha=0.0001)}
paramModel['VOT'] = {'model': VotingClassifier(estimators=[('GBC', paramModel['GBC']['model']), ('LRC', paramModel['LRC']['model']), ('GNB', paramModel['GNB']['model']),
('DTC', paramModel['DTC']['model']), ('KNC', paramModel['KNC']['model']), ('RFC', paramModel['RFC']['model']),
('LSVC', paramModel['LSVC']['model']), ('SVC', paramModel['SVC']['model']), ('MLP', paramModel['MLP']['model'])
],
voting='hard')}
paramModel['STK'] = {'model': StackingClassifier(estimators=[('GBC', paramModel['GBC']['model']), ('LRC', paramModel['LRC']['model']), ('GNB', paramModel['GNB']['model']),
('DTC', paramModel['DTC']['model']), ('KNC', paramModel['KNC']['model']), ('RFC', paramModel['RFC']['model']),
('LSVC', paramModel['LSVC']['model']), ('SVC', paramModel['SVC']['model']), ('MLP', paramModel['MLP']['model'])
],
final_estimator=LogisticRegression())}
#----------------------------------------------------
counts = Counter(yval)
NormalizeAcc = counts[1] / (counts[0]+counts[1])
if(NormalizeAcc < 0.5):
NormalizeAcc = 1 - NormalizeAcc
#----------------------------------------------------
for i in paramModel.keys():
paramModel[i]['model'].fit(Xlearn, ylearn)
ypred = paramModel[i]['model'].predict(Xval)
accuracy = accuracy_score(yval, ypred)
precision = precision_score(yval, ypred)
recall = recall_score(yval, ypred)
f1 = f1_score(yval, ypred)
NormalizedAcc = (accuracy * 0.5) / NormalizeAcc
backtest = backtester(Xval ,paramModel[i]['model'] ,period=Xval.shape[0] ,lev=1)
nnp = Buybacktester(Xval , amount=10000 , fee=0.01 , period=Xval.shape[0] , lev=1)
paramModel[i]['acc'] = accuracy
paramModel[i]['backtest'] = backtest
paramModel[i]['nnp'] = nnp
paramModel[i]['NormalizedAcc'] = NormalizedAcc
paramModel[i]['precision'] = precision
paramModel[i]['recall'] = recall
paramModel[i]['f1'] = f1
return paramModel
#-------------------------------------------------------------------------------------------------------
def GridSearchModels(Xlearn,ylearn,Xval,yval):
param_grids = {
'GBC': {'n_estimators': [50, 100, 200], 'learning_rate': [0.01, 0.1, 1], 'max_depth': [5, 7, 10, 12]},
'LRC': {'C': [0.1, 1, 10]},
'DTC': {'max_depth': [3, 5, 7], 'min_samples_split': [4,5,6,7], 'min_samples_leaf': [3,4,5,6]},
'RFC': {'n_estimators': [50, 100, 200, 400, 600], 'max_depth': [3, 5, 7 ,10 ,14]},
'KNC': {'n_neighbors': [3, 5, 7, 10, 14]},
'GNB': {}, # No hyperparameters for GaussianNB
'LSVC': {'C': [0.1, 1, 10] ,'max_iter' : [1000,5000,10000,20000,40000]},
'SVC': {'max_iter': [1000,5000,10000,20000,40000],'C': [0.1, 1, 10], 'gamma': ['auto', 'scale']},
'MLP': {'hidden_layer_sizes': [(20,), (50,), (100,)], 'max_iter': [10000 , 20000]}
}
models = {
'GBC': GradientBoostingClassifier(),
'LRC': LogisticRegression(),
'DTC': DecisionTreeClassifier(),
'RFC': RandomForestClassifier(),
'KNC': KNeighborsClassifier(),
'GNB': GaussianNB(),
'LSVC': LinearSVC(),
'SVC': SupportVectorClassifier(),
'MLP': MLPClassifier()
}
#----------------------------------------------------
counts = Counter(yval)
NormalizeAcc = counts[1] / (counts[0]+counts[1])
if(NormalizeAcc < 0.5):
NormalizeAcc = 1 - NormalizeAcc
#----------------------------------------------------
trained_models = {}
for name, model in models.items():
grid_search = GridSearchCV(model, param_grids[name], cv=5, scoring='accuracy', n_jobs=-1)
grid_search.fit(Xlearn, ylearn)
best_model = grid_search.best_estimator_
#print(f"Best hyperparameters for {name}: {grid_search.best_params_}")
best_model.fit(Xlearn, ylearn)
ypred = best_model.predict(Xval)
accuracy = accuracy_score(yval, ypred)
precision = precision_score(yval, ypred)
recall = recall_score(yval, ypred)
f1 = f1_score(yval, ypred)
auc = roc_auc_score(yval, ypred)
NormalizedAcc = (accuracy * 0.5) / NormalizeAcc
backtest = backtester(Xval ,best_model ,period=Xval.shape[0] ,lev=1)
nnp = Buybacktester(Xval , amount=10000 , fee=0.01 , period=Xval.shape[0] , lev=1)
trained_models[name] = {'model': best_model,
'acc': accuracy ,
'backtest' : backtest,
'nnp' : nnp,
'NormalizedAcc' : NormalizedAcc,
'precision' : precision,
'recall' : recall,
'f1' : f1,
'auc' : auc}
#print(f"Best hyperparameters: {grid_search.best_params_}")
#print(f"GS Test accuracy: {accuracy} Norm Test accuracy:{Naccuracy} \n")
return trained_models
#--------------------------------------------------------------------------------------------------------
def Main(data, gridsearch=False , shift=10):
data = AddOsil(data)
data = AddLabel(data)
X,y = TimeSeriesGen(data , shift)
Xtrain , Xtest , ytrain , ytest = slicingTimeseries(X[160:-1:],y[160:-1:],sliceTrain=0.90)#95
Xlearn, Xval, ylearn, yval = slicingTimeseries(Xtrain,ytrain,sliceTrain=0.90)#7
dic1 = BaselineModels(Xlearn,ylearn,Xval,yval)
if gridsearch == False:
dic2 = ParamsModels(Xlearn,ylearn,Xval,yval)
print('Models-------NorNormAcc:---------ParNormAcc--------NormalACC:---------ParamACC:')
for i ,j in dic1.items():
for z ,d in dic2.items():
if i == z :
print(i,'===',j['NormalizedAcc'],d['NormalizedAcc'],j['acc'],d['acc'])
return dic1,dic2,Xtest,ytest
elif gridsearch == True:
dicGS = GridSearchModels(Xlearn,ylearn,Xval,yval)
print('Models-------NorNormAcc:---------GridNormAcc--------NormalACC:---------GridACC:')
for i ,j in dic1.items():
for z ,d in dicGS.items():
if i == z :
print(i,'===',j['NormalizedAcc'],d['NormalizedAcc'],j['acc'],d['acc'])
return dic1,dicGS,Xtest,ytest
#--------------------------------------------------------------------------------------------------------
def CreateDF(Maindf):
index = []
data = {}
data['model'] = []
data['Valacc'] = []
data['ValNormalizedAcc'] = []
data['Valprecision'] = []
data['Valrecall'] = []
data['Valf1'] = []
data['Valbacktest'] = []
data['Valnnp'] = []
for i in Maindf.keys():
for j,k in Maindf[i]['BModel'].items():
index.append(i+' N')
data['model'].append(k['model'])
data['Valacc'].append(k['acc'])
data['ValNormalizedAcc'].append(k['NormalizedAcc'])
data['Valprecision'].append(k['precision'])
data['Valrecall'].append(k['recall'])
data['Valf1'].append(k['f1'])
data['Valbacktest'].append(k['backtest'])
data['Valnnp'].append(k['nnp'])
for n,m in Maindf[i]['PModel'].items():
index.append(i+' P')
data['model'].append(m['model'])
data['Valacc'].append(m['acc'])
data['ValNormalizedAcc'].append(m['NormalizedAcc'])
data['Valprecision'].append(m['precision'])
data['Valrecall'].append(m['recall'])
data['Valf1'].append(m['f1'])
data['Valbacktest'].append(m['backtest'])
data['Valnnp'].append(m['nnp'])
if 'GModel' in Maindf[i].keys() :
for e,r in Maindf[i]['GModel'].items():
index.append(i+' G')
data['model'].append(r['model'])
data['Valacc'].append(r['acc'])
data['ValNormalizedAcc'].append(r['NormalizedAcc'])
data['Valprecision'].append(r['precision'])
data['Valrecall'].append(r['recall'])
data['Valf1'].append(r['f1'])
data['Valbacktest'].append(r['backtest'])
data['Valnnp'].append(r['nnp'])
else:
print("GModel does not exist in row", i)
df = pd.DataFrame(data, index=index)
df ['ValProfit %'] = df['Valbacktest']/df['Valnnp']*100
return df
#-----------------------------------------------------------------------------------------------------
def ADDTestFeatures(df , Maindf):
c=0
TESTbackt = []
nnpt = []
# TrainAccuracy = []
#add new featurs to set
#
for i in df.index:
ticker = i[:-2]
Xt = Maindf[ticker]['Xtest']
Yt = Maindf[ticker]['ytest']
#Yp = df['model'].values[c].predict(Xt)
#accuracy = accuracy_score(Yt , Yp)
#precision = precision_score(Yt , Yp)
#recall = recall_score(Yt , Yp)
#f1 = f1_score(Yt , Yp)
#auc = roc_auc_score(Yt , Yp)
backtest = backtester(Xt ,df['model'].values[c] ,period=Xt.shape[0] ,lev=1)
Testnnp = Buybacktester(Xt , amount=10000 , fee=0.01 , period=Xt.shape[0] , lev=1)
c = c+1
TESTbackt.append(backtest)
nnpt.append(Testnnp)
df['Testbacktest'] = TESTbackt
df['Testnnp'] = nnpt
df ['TestProfit %'] = df['Testbacktest']/df['Testnnp']*100
df['Label'] = np.where(df['TestProfit %'] > 103, 1, 0)
return df
#-----------------------------------------------------------------------------------------------------
def TrainValTestSplit(df,test_size = 0.30,val_size = 0.30):
y = df['Label']
X = df
BXtrain, BXtest, Bytrain, Bytest = train_test_split(X, y, test_size = test_size, shuffle=True)
#, 'Valacc' , 'Valbacktest' , 'Valnnp'
z = BXtest[['Testbacktest','TestProfit %','model','Testnnp','Label']]
BXtrain = BXtrain.drop(['Testbacktest','TestProfit %','model','Testnnp','Label'], axis=1)
BXtest = BXtest.drop(['Testbacktest','TestProfit %','model','Testnnp','Label'], axis=1)
BXlearn, BXval, Bylearn, Byval = train_test_split(BXtrain, Bytrain, test_size = val_size)
#print(BXtrain.shape, BXtest.shape, Bytrain.shape, Bytest.shape)
#print(BXlearn.shape, BXval.shape, Bylearn.shape, Byval.shape)
#print(z.shape)
return BXlearn, BXval, Bylearn, Byval, BXtest, Bytest, z
#-----------------------------------------------------------------------------------------------------
def BestModelFinder(BXtrain, BXtest, Bytrain, Bytest):
paramModel = {}
paramModel['GBC'] = {'model': GradientBoostingClassifier(n_estimators=100, learning_rate=0.01, max_depth=12)}
paramModel['LRC'] = {'model': LogisticRegression(C=0.1, max_iter=10000, solver='lbfgs')}
paramModel['DTC'] = {'model': DecisionTreeClassifier(max_depth = 12, min_samples_split=6, min_samples_leaf=4)}
paramModel['RFC'] = {'model': RandomForestClassifier(n_estimators = 500, max_depth = 10)}
paramModel['KNC'] = {'model': KNeighborsClassifier(n_neighbors = 7, weights='distance')} #7
paramModel['GNB'] = {'model': GaussianNB(var_smoothing=1e-09)}
paramModel['MLP'] = {'model': MLPClassifier(hidden_layer_sizes = (69),max_iter = 10000 ,activation='relu', alpha=0.0001)}
paramModel['VOT'] = {'model': VotingClassifier(estimators=[('GBC', paramModel['GBC']['model']), ('LRC', paramModel['LRC']['model']),
('GNB', paramModel['GNB']['model']),('DTC', paramModel['DTC']['model']), ('KNC', paramModel['KNC']['model']), ('RFC', paramModel['RFC']['model']),
('MLP', paramModel['MLP']['model'])
],
voting='hard')}
paramModel['STK'] = {'model': StackingClassifier(estimators=[('GBC', paramModel['GBC']['model']), ('LRC', paramModel['LRC']['model']),
('GNB', paramModel['GNB']['model']),('DTC', paramModel['DTC']['model']), ('KNC', paramModel['KNC']['model']), ('RFC', paramModel['RFC']['model']),
('MLP', paramModel['MLP']['model'])
],
final_estimator=LogisticRegression())}
for i in paramModel.keys():
paramModel[i]['model'].fit(BXtrain, Bytrain)
ypred = paramModel[i]['model'].predict(BXtest)
accuracy = accuracy_score(Bytest, ypred)
precision = precision_score(Bytest, ypred)
recall = recall_score(Bytest, ypred)
f1 = f1_score(Bytest, ypred)
ypred2 = paramModel[i]['model'].predict(BXtrain)
accuracy2 = accuracy_score(Bytrain, ypred2)
precision2 = precision_score(Bytrain, ypred2)
recall2 = recall_score(Bytrain, ypred2)
f12 = f1_score(Bytrain, ypred2)
paramModel[i]['LearnACC'] = accuracy2
paramModel[i]['Learnprecision'] = precision2
paramModel[i]['Learnrecall'] = recall2
paramModel[i]['Learnf1'] = f12
paramModel[i]['ValACC'] = accuracy
paramModel[i]['Valprecision'] = precision
paramModel[i]['Valrecall'] = recall
paramModel[i]['Valf1'] = f1
return paramModel
#-----------------------------------------------------------------------------------------------------
def plotFinalModel(model, p, data , title ,lev=1):
X = data.iloc[-p::,:77:]
X["output"] = model.predict(X.iloc[::,::])
X["NormalReturns"] = X["returns"].iloc[:,0].cumsum()
X["output"] = X["output"].replace(0 , -1)
X['outputshift'] = X['output'].shift(1)
X['strategy'] = X["returns"].iloc[:,0] * X["outputshift"] * lev
X["StrategyReturns"] = X["strategy"].cumsum()
X[["NormalReturns","StrategyReturns"]].plot(figsize = (12 , 8))
plt.title(title)
plt.show()
#-----------------------------------------------------------------------------------------------------
def plotBestModel(BestDic, BXtest, Bestmetric='ValACC', lev=1, showResultDf=True, label='PLabel', showbest=False):
best_model_name = max(BestDic, key=lambda key: BestDic[key][Bestmetric])
best_model_test_acc = BestDic[best_model_name]['ValACC']
if 'PLabel' in BXtest.columns:
BXtest = BXtest.drop(['PLabel'] , axis=1)
ypred = BestDic[best_model_name]['model'].predict(BXtest)
print('best instrument-model selector is:',best_model_name,'with acc:',best_model_test_acc)
result_df = pd.concat([BXtest, z], axis=1)
result_df['PLabel'] = ypred
if showbest==True:
df = result_df[result_df[label] == 1]
max_row = df[df['Valacc'] == df['Valacc'].max()]
ticker = max_row.index[0][:-2]
TestData = Maindf[ticker]['XTest']
ChosseModel = max_row['model'].values[0]
display(ChosseModel)
plotFinalModel(ChosseModel, TestData.shape[0], TestData , ticker , lev=lev)
else:
for i in range(1,result_df[result_df[label] == 1].shape[0]+1):
ticker = result_df[result_df[label] == 1][i-1:i].index[0][:-2]
TestData = Maindf[ticker]['XTest']
ChosseModel = result_df[result_df[label] == 1][i-1:i]['model'].values[0]
display(ChosseModel)
plotFinalModel(ChosseModel, TestData.shape[0], TestData , ticker , lev=lev)
if showResultDf==True:
pd.set_option('display.max_columns', 9)
display(result_df[result_df[label] == 1])
pd.reset_option('display.max_columns')
#-------------------------------------------------------------------------------------------------
def PlotSumModel(Finaldf, symbolsreturn):
DF = pd.DataFrame()
DF['ModelsReturn'] = Finaldf["column_sums"]
DF['NormalReturn'] = symbolsreturn["column_sums"]
DF[["ModelsReturn","NormalReturn"]].plot(figsize = (12 , 8))
plt.title('sum of all tradings')
plt.show()
#-------------------------------------------------------------------------------------------------
def SumBestModels(BestDic, BXtest, lev=1 ,label='PLabel', LimitRows=False):
def Bestreturns(model, p, data ,lev=1):
X = data.iloc[-p::,::]
X["output"] = model.predict(X.iloc[::,::])
X["NormalReturns"] = X["returns"].iloc[:,0].cumsum()
X["output"] = X["output"].replace(0 , -1)
X['outputshift'] = X['output'].shift(1)
X['strategy'] = X["returns"].iloc[:,0] * X["outputshift"] * lev
X["StrategyReturns"] = X["strategy"].cumsum()
return X["StrategyReturns"],X["NormalReturns"]
best_model_name = max(BestDic, key=lambda key: BestDic[key]["ValACC"]+BestDic[key]["Valprecision"])
best_model_test_acc = BestDic[best_model_name]['ValACC']
if 'PLabel' in BXtest.columns:
BXtest = BXtest.drop(['PLabel'] , axis=1)
ypred = BestDic[best_model_name]['model'].predict(BXtest)
print('best instrument-model selector is:',best_model_name,'with acc:',best_model_test_acc)
result_df = pd.concat([BXtest, z], axis=1)
result_df['PLabel'] = ypred
finaldic = {}
symbolreturn = {}
df1 = result_df[result_df['PLabel'] == 1]
selected_rows = df1[df1['ValProfit %'] > 100]
if LimitRows == False:
SelectedDF = df1
else:
SelectedDF = selected_rows
for i in range(1,SelectedDF.shape[0]+1):
ticker = SelectedDF[i-1:i].index[0][:-2]
TestData = Maindf[ticker]['Xtest']
ChosseModel = SelectedDF[i-1:i]['model'].values[0]
finaldic[ticker+str(i)], symbolreturn[ticker+str(i)] = Bestreturns(ChosseModel, TestData.shape[0], TestData ,lev=lev)
df1 = pd.DataFrame(finaldic)
df2 = pd.DataFrame(symbolreturn)
df1.fillna(method='ffill', inplace=True)
df2.fillna(method='ffill', inplace=True)
df1['column_sums'] = df1.sum(axis=1)
df2['column_sums'] = df2.sum(axis=1)
return df1 ,df2
#-----------------------------------------------------------------------------------------------------
def SumBestModelsImp(MainIdf, BestDic, BXtest, z, lev=1 ,label='PLabel', LimitRows=False):
def Bestreturns(model, p, data ,lev=1):
X = data.iloc[-p::,::]
X["output"] = model.predict(X.iloc[::,::])
X["NormalReturns"] = X["returns"].iloc[:,0].cumsum()
X["output"] = X["output"].replace(0 , -1)
X['outputshift'] = X['output'].shift(1)
X['strategy'] = X["returns"].iloc[:,0] * X["outputshift"] * lev
X["StrategyReturns"] = X["strategy"].cumsum()
return X["StrategyReturns"],X["NormalReturns"]
#------------------------------------------------------------------
best_model_name = max(BestDic, key=lambda key: BestDic[key]["ValACC"]+BestDic[key]["Valprecision"])
best_model_test_acc = BestDic[best_model_name]['ValACC']
if 'PLabel' in BXtest.columns:
BXtest = BXtest.drop(['PLabel'] , axis=1)
ypred = BestDic[best_model_name]['model'].predict(BXtest)
print('best instrument-model selector is:',best_model_name,'with acc:',best_model_test_acc)
result_df = pd.concat([BXtest, z], axis=1)
result_df['PLabel'] = ypred
finaldic = {}
symbolreturn = {}
df1 = result_df[result_df['PLabel'] == 1]
selected_rows = df1[df1['ValProfit %'] > 100]
if LimitRows == False:
SelectedDF = df1
else:
SelectedDF = selected_rows
for i in range(1,SelectedDF.shape[0]+1):
ticker = SelectedDF[i-1:i].index[0][:-2]
TestData = MainIdf[ticker]['Xtest']
ChosseModel = SelectedDF[i-1:i]['model'].values[0]
finaldic[ticker+str(i)], symbolreturn[ticker+str(i)] = Bestreturns(ChosseModel, TestData.shape[0], TestData ,lev=lev)
df1 = pd.DataFrame(finaldic)
df2 = pd.DataFrame(symbolreturn)
df1.fillna(method='ffill', inplace=True)
df2.fillna(method='ffill', inplace=True)
df1['column_sums'] = df1.sum(axis=1)
df2['column_sums'] = df2.sum(axis=1)
return df1 ,df2 ,SelectedDF
# Your New Python File
from AlgorithmImports import *
from lib import *
import warnings
from sklearn.exceptions import UndefinedMetricWarning, ConvergenceWarning
warnings.filterwarnings("ignore", category=DeprecationWarning)
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=ConvergenceWarning)
warnings.filterwarnings("ignore", category=UndefinedMetricWarning)
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=pd.errors.SettingWithCopyWarning)
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import plotly.graph_objects as go
import yfinance as yf
import pandas_ta as ta
from IPython.core.display import display, HTML
from collections import Counter
import time
from datetime import datetime
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.naive_bayes import GaussianNB, MultinomialNB
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.ensemble import VotingClassifier
from sklearn.ensemble import StackingClassifier
from sklearn.svm import LinearSVC
from sklearn.svm import SVC as SupportVectorClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.model_selection import train_test_split
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import KFold
from sklearn.model_selection import StratifiedKFold
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import accuracy_score
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import f1_score
from sklearn.metrics import confusion_matrix
from sklearn.metrics import roc_curve, roc_auc_score
#-----------------------------------------------------------------------------------------------------------
#-----------------------------------------------------------------------------------------------------------
#-----------------------------------------------------------------------------------------------------------
#-----------------------------------------------------------------------------------------------------------
#-----------------------------------------------------------------------------------------------------------
#-----------------------------------------------------------------------------------------------------------
#-----------------------------------------------------------------------------------------------------------
#-----------------------------------------------------------------------------------------------------------
#-----------------------------------------------------------------------------------------------------------
#-----------------------------------------------------------------------------------------------------------
#-----------------------------------------------------------------------------------------------------------
#-----------------------------------------------------------------------------------------------------------
#-----------------------------------------------------------------------------------------------------------
#-----------------------------------------------------------------------------------------------------------
#-----------------------------------------------------------------------------------------------------------
#-----------------------------------------------------------------------------------------------------------
class MyAlgorithm(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2023, 1, 1)
self.SetCash(1000000)
#self.symbols = ["AAPL", "MSFT", "GOOG", "AMZN", "FB", "TSLA", "NFLX", "NVDA", "BABA", "JPM", "BND", "SPY"]
self.symbols = [ "TSLA", "BABA", "BND"]
self.shifting = 1
self.MainIdf = {}
self.instList = []
qb = QuantBook()
# Download historical data and set leverage
self.Log('downloads: -------------------------------------------------------')
for symbol in self.symbols:
equity = self.AddEquity(symbol, Resolution.Daily)
equity.SetLeverage(2)
history = self.History(equity.Symbol, 1800, Resolution.Daily)
df = history.loc[symbol]
df.rename(columns={'open': 'Open', 'high': 'High', 'low': 'Low', 'close': 'Close', 'volume': 'Volume'}, inplace=True)
self.MainIdf[symbol] = {'DataFrame': df}
self.instList.append((equity.Symbol, symbol))
self.Log(f"{symbol} : Downloaded")
# Process data with the Main function
for symbol in self.symbols:
self.Log(f"{symbol} Main func output:")
data = self.MainIdf[symbol]['DataFrame']
d1, d2, testx, testy = Main(data, gridsearch=False, shift=self.shifting)
self.MainIdf[symbol]['BModel'] = d1
self.MainIdf[symbol]['PModel'] = d2
self.MainIdf[symbol]['Xtest'] = testx
self.MainIdf[symbol]['ytest'] = testy
# Further training and evaluation
self.Log('second train: -------------------------------------------------------')
df2 = CreateDF(self.MainIdf)
df3 = ADDTestFeatures(df2, self.MainIdf)
BXlearn, BXval, Bylearn, Byval, BXtest, Bytest, z = TrainValTestSplit(df3, test_size=0.25, val_size=0.33)
BestDic = BestModelFinder(BXlearn, BXval, Bylearn, Byval)
self.Log('outputs: -------------------------------------------------------')
self.Finaldf, self.symbolsreturn, self.SelectedF = SumBestModelsImp(self.MainIdf, BestDic, BXtest, z, lev=3, label='PLabel', LimitRows=False)
PlotSumModel(self.Finaldf, self.symbolsreturn)
self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.At(10, 0), self.PlaceOrders)
self.Schedule.On(self.DateRules.Tomorrow, self.TimeRules.At(10, 0), self.CloseOrders)
def OnData(self, data):
pass
def PlaceOrders(self):
self.Log('orders: -------------------------------------------------------')
ordertype = {}
for i in range(1, self.SelectedF.shape[0] + 1):
ticker = self.SelectedF.iloc[i - 1].name[:-2]
instrument = None
for j in range(len(self.instList)):
if self.instList[j][1] == ticker:
instrument = self.instList[j][0]
self.Log(instrument)
break
TData = self.MainIdf[ticker]['DataFrame'][-200:]
ToData = AddOsil(TData)
TodData = AddLabel(ToData)
TodaData, y = TimeSeriesGen(TodData, self.shifting)
TodayData = TodaData[-1:]
ChosseModel = self.SelectedF.iloc[i - 1]['model']
order = ChosseModel.predict(TodayData)
self.Log(order)
if order[0] == 1:
self.Log(f'{instrument} Buy Order Placed for ticker: {ticker}')
self.SetHoldings(instrument, 1) # Long position
ordertype[ticker + str(i)] = 'Buy'
elif order[0] == 0:
self.Log(f'{instrument} Sell Order Placed for ticker: {ticker}')
self.SetHoldings(instrument, -1) # Short position
ordertype[ticker + str(i)] = 'Sell'
self.Log('---------------------------------------------------------')
self.Log('Orders Placement is done.')
def CloseOrders(self):
self.Log('Closing all positions after 24 hours')
self.Liquidate()