| Overall Statistics |
|
Total Orders 552 Average Win 0.17% Average Loss -0.02% Compounding Annual Return 88.538% Drawdown 22.900% Expectancy 10.683 Start Equity 1000000 End Equity 2143890.13 Net Profit 114.389% Sharpe Ratio 1.799 Sortino Ratio 2.106 Probabilistic Sharpe Ratio 74.952% Loss Rate 4% Win Rate 96% Profit-Loss Ratio 11.15 Alpha 0.224 Beta 2.36 Annual Standard Deviation 0.323 Annual Variance 0.104 Information Ratio 1.722 Tracking Error 0.25 Treynor Ratio 0.246 Total Fees $547.77 Estimated Strategy Capacity $54000000.00 Lowest Capacity Asset GOOCV VP83T1ZUHROL Portfolio Turnover 1.75% |
#region imports
from AlgorithmImports import *
#endregion
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import plotly.graph_objects as go
import yfinance as yf
import pandas_ta as ta
from IPython.core.display import display, HTML
from collections import Counter
import time
from datetime import datetime
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.naive_bayes import GaussianNB, MultinomialNB
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier,GradientBoostingClassifier,VotingClassifier,StackingClassifier
from sklearn.svm import LinearSVC
from sklearn.svm import SVC as SupportVectorClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.model_selection import train_test_split,GridSearchCV,cross_val_score,KFold,StratifiedKFold
from sklearn.metrics import accuracy_score,precision_score,recall_score,f1_score,confusion_matrix,roc_curve, roc_auc_score
def symbolShift(df, num_shift, drop_first_shift=True, Label=True , shifted_column='Close'):
X = df[shifted_column]
shifted_dict = {}
shifted_dict[shifted_column] = df[shifted_column]
for i in range(1,num_shift+1):
shifted_series = X.shift(i)
shifted_dict[f'sh{i}'] = shifted_series
X2 = pd.DataFrame(shifted_dict)
if drop_first_shift == True :
X2 = X2[num_shift::]
if Label == True :
X2['Label'] = X2['returns']-X2['sh1']
X2.loc[X2['Label'] < 0 , 'Label'] = 0
X2.loc[X2['Label'] > 0 , 'Label'] = 1
return X2
def InputOutput(dataframe, label, traintest=True, split_size=None):
X = dataframe.drop(label, axis=1)
y = dataframe[label]
if traintest == True:
Xtrain, Xtest, ytrain, ytest = train_test_split(X, y, test_size = split_size)
return Xtrain, Xtest, ytrain, ytest
else:
return X,y
return fig.show()
def slicingTimeseries(X,y,sliceTrain=0.8):
slicTrain = X.shape[0]*sliceTrain
Xtrain = X[0:int(slicTrain)]
Xtest = X[int(slicTrain):]
ytrain = y[0:int(slicTrain)]
ytest = y[int(slicTrain):]
return Xtrain , Xtest , ytrain , ytest
def AddOsil(df):
df['returns'] = df['Close'].pct_change()
df['RSI']=ta.rsi(df.Close, length=15)
df['EMAF']=ta.ema(df.Close, length=20)
df['EMAM']=ta.ema(df.Close, length=100)
df['EMAS']=ta.ema(df.Close, length=150)
return df
def AddLabel(df):
df['Label'] = df['Close']-df['Open']
df.loc[df['Label'] < 0 , 'Label'] = 0
df.loc[df['Label'] > 0 , 'Label'] = 1
df['Label2'] = df['Label'].shift(-1)
return df
def TimeSeriesGen(df , shift):
y = df['Label2']
X = df.drop(['Label','Label2','Open','High','Low'],axis=1)
Z = X.copy()
for i in range(shift):
i = i+1
X = pd.concat([X, Z.shift(i)], axis=1)
return X,y
def backtester(data , model , amount=10000 , fee=0.01 , period=400 , lev=1):
buy = 0
sell = 0
baseAmount = amount
borrow = amount*(lev-1)
amount = amount*lev
for i in range(1,period):
ytomo = model.predict(data.iloc[i-1:i])
if ytomo == 1 :
buy +=1
amount -=fee
amount=amount*((data.iloc[i:i+1].returns.iloc[0][0] * 100) +100)/ 100
elif ytomo == 0 :
sell +=1
amount -=fee
amount=amount*(-(data.iloc[i:i+1].returns.iloc[0][0] * 100) +100)/ 100
#print(amount)
if amount <= borrow:
break
netprof = amount-borrow
return netprof
def Buybacktester(data , amount=10000 , fee=0.01 , period=400 , lev=1):
buy = 0
sell = 0
baseAmount = amount
borrow = amount*(lev-1)
amount = amount*lev
for i in range(1,period):
ytomo =1
if ytomo == 1 :
buy +=1
amount -=fee
amount=amount*((data.iloc[i-1:i].returns.iloc[0][0] * 100) +100)/ 100
elif ytomo == 0 :
sell +=1
amount -=fee
amount=amount*(-(data.iloc[i-1:i].returns.iloc[0][0] * 100) +100)/ 100
#print(amount)
if amount <= borrow:
print('Call Marjin')
break
netprof = amount-borrow
return netprof
def BaselineModels(Xlearn,ylearn,Xval,yval):
Model = {}
Model['GBC'] = {'model': GradientBoostingClassifier()}
Model['LRC'] = {'model': LogisticRegression()}
Model['DTC'] = {'model': DecisionTreeClassifier()}
Model['RFC'] = {'model':RandomForestClassifier()}
Model['KNC'] = {'model': KNeighborsClassifier()} #7
Model['GNB'] = {'model': GaussianNB()}
Model['LSVC'] = {'model': LinearSVC()}
Model['SVC'] = {'model': SupportVectorClassifier()}
Model['MLP'] = {'model': MLPClassifier()}
Model['VOT'] = {'model': VotingClassifier(estimators=[('GBC', Model['GBC']['model']), ('LRC', Model['LRC']['model']), ('GNB', Model['GNB']['model']),
('DTC', Model['DTC']['model']), ('KNC', Model['KNC']['model']), ('RFC', Model['RFC']['model']),
('LSVC', Model['LSVC']['model']), ('SVC', Model['SVC']['model']), ('MLP', Model['MLP']['model'])
],
voting='hard')}
Model['STK'] = {'model': StackingClassifier(estimators=[('GBC', Model['GBC']['model']), ('LRC', Model['LRC']['model']), ('GNB', Model['GNB']['model']),
('DTC', Model['DTC']['model']), ('KNC', Model['KNC']['model']), ('RFC', Model['RFC']['model']),
('LSVC', Model['LSVC']['model']), ('SVC', Model['SVC']['model']), ('MLP', Model['MLP']['model'])
],
final_estimator=LogisticRegression())}
#----------------------------------------------------
counts = Counter(yval)
print('norm yval count:',counts)
NormalizeAcc = counts[1] / (counts[0]+counts[1])
print(NormalizeAcc)
if(NormalizeAcc < 0.5):
NormalizeAcc = 1 - NormalizeAcc
print(NormalizeAcc)
#----------------------------------------------------
for i in Model.keys():
Model[i]['model'].fit(Xlearn, ylearn)
ypred = Model[i]['model'].predict(Xval)
accuracy = accuracy_score(yval, ypred)
precision = precision_score(yval, ypred)
recall = recall_score(yval, ypred)
f1 = f1_score(yval, ypred)
NormalizedAcc = (accuracy * 0.5) / NormalizeAcc
backtest = backtester(Xval ,model=Model[i]['model'] ,period = Xval.shape[0], lev=1)
nnp = Buybacktester(Xval , amount=10000 , fee=0.01 , period=Xval.shape[0] , lev=1)
Model[i]['acc'] = accuracy
Model[i]['backtest'] = backtest
Model[i]['nnp'] = nnp
Model[i]['NormalizedAcc'] = NormalizedAcc
Model[i]['precision'] = precision
Model[i]['recall'] = recall
Model[i]['f1'] = f1
return Model
#--------------------------------------------------------------------------------------------------------
def ParamsModels(Xlearn,ylearn,Xval,yval):
paramModel = {}
paramModel['GBC'] = {'model': GradientBoostingClassifier(n_estimators=100, learning_rate=0.01, max_depth=12)}
paramModel['LRC'] = {'model': LogisticRegression(C=0.1, max_iter=10000, solver='lbfgs')}
paramModel['DTC'] = {'model': DecisionTreeClassifier(max_depth = 12, min_samples_split=6, min_samples_leaf=4)}
paramModel['RFC'] = {'model': RandomForestClassifier(n_estimators = 500, max_depth = 10)}
paramModel['KNC'] = {'model': KNeighborsClassifier(n_neighbors = 7, weights='distance')}
paramModel['GNB'] = {'model': GaussianNB(var_smoothing=1e-09)}
paramModel['LSVC'] = {'model': LinearSVC(C=1 ,max_iter = 10000)}
paramModel['SVC'] = {'model': SupportVectorClassifier(C=1, kernel='rbf', gamma='scale', max_iter=5000)}
paramModel['MLP'] = {'model': MLPClassifier(hidden_layer_sizes = (69),max_iter = 10000 ,activation='relu', alpha=0.0001)}
paramModel['VOT'] = {'model': VotingClassifier(estimators=[('GBC', paramModel['GBC']['model']), ('LRC', paramModel['LRC']['model']), ('GNB', paramModel['GNB']['model']),
('DTC', paramModel['DTC']['model']), ('KNC', paramModel['KNC']['model']), ('RFC', paramModel['RFC']['model']),
('LSVC', paramModel['LSVC']['model']), ('SVC', paramModel['SVC']['model']), ('MLP', paramModel['MLP']['model'])
],
voting='hard')}
paramModel['STK'] = {'model': StackingClassifier(estimators=[('GBC', paramModel['GBC']['model']), ('LRC', paramModel['LRC']['model']), ('GNB', paramModel['GNB']['model']),
('DTC', paramModel['DTC']['model']), ('KNC', paramModel['KNC']['model']), ('RFC', paramModel['RFC']['model']),
('LSVC', paramModel['LSVC']['model']), ('SVC', paramModel['SVC']['model']), ('MLP', paramModel['MLP']['model'])
],
final_estimator=LogisticRegression())}
#----------------------------------------------------
counts = Counter(yval)
NormalizeAcc = counts[1] / (counts[0]+counts[1])
if(NormalizeAcc < 0.5):
NormalizeAcc = 1 - NormalizeAcc
#----------------------------------------------------
for i in paramModel.keys():
paramModel[i]['model'].fit(Xlearn, ylearn)
ypred = paramModel[i]['model'].predict(Xval)
accuracy = accuracy_score(yval, ypred)
precision = precision_score(yval, ypred)
recall = recall_score(yval, ypred)
f1 = f1_score(yval, ypred)
NormalizedAcc = (accuracy * 0.5) / NormalizeAcc
backtest = backtester(Xval ,paramModel[i]['model'] ,period=Xval.shape[0] ,lev=1)
nnp = Buybacktester(Xval , amount=10000 , fee=0.01 , period=Xval.shape[0] , lev=1)
paramModel[i]['acc'] = accuracy
paramModel[i]['backtest'] = backtest
paramModel[i]['nnp'] = nnp
paramModel[i]['NormalizedAcc'] = NormalizedAcc
paramModel[i]['precision'] = precision
paramModel[i]['recall'] = recall
paramModel[i]['f1'] = f1
return paramModel
#-------------------------------------------------------------------------------------------------------
def GridSearchModels(Xlearn,ylearn,Xval,yval):
param_grids = {
'GBC': {'n_estimators': [50, 100, 200], 'learning_rate': [0.01, 0.1, 1], 'max_depth': [5, 7, 10, 12]},
'LRC': {'C': [0.1, 1, 10]},
'DTC': {'max_depth': [3, 5, 7], 'min_samples_split': [4,5,6,7], 'min_samples_leaf': [3,4,5,6]},
'RFC': {'n_estimators': [50, 100, 200, 400, 600], 'max_depth': [3, 5, 7 ,10 ,14]},
'KNC': {'n_neighbors': [3, 5, 7, 10, 14]},
'GNB': {}, # No hyperparameters for GaussianNB
'LSVC': {'C': [0.1, 1, 10] ,'max_iter' : [1000,5000,10000,20000,40000]},
'SVC': {'max_iter': [1000,5000,10000,20000,40000],'C': [0.1, 1, 10], 'gamma': ['auto', 'scale']},
'MLP': {'hidden_layer_sizes': [(20,), (50,), (100,)], 'max_iter': [10000 , 20000]}
}
models = {
'GBC': GradientBoostingClassifier(),
'LRC': LogisticRegression(),
'DTC': DecisionTreeClassifier(),
'RFC': RandomForestClassifier(),
'KNC': KNeighborsClassifier(),
'GNB': GaussianNB(),
'LSVC': LinearSVC(),
'SVC': SupportVectorClassifier(),
'MLP': MLPClassifier()
}
#----------------------------------------------------
counts = Counter(yval)
NormalizeAcc = counts[1] / (counts[0]+counts[1])
if(NormalizeAcc < 0.5):
NormalizeAcc = 1 - NormalizeAcc
#----------------------------------------------------
trained_models = {}
for name, model in models.items():
grid_search = GridSearchCV(model, param_grids[name], cv=5, scoring='accuracy', n_jobs=-1)
grid_search.fit(Xlearn, ylearn)
best_model = grid_search.best_estimator_
#print(f"Best hyperparameters for {name}: {grid_search.best_params_}")
best_model.fit(Xlearn, ylearn)
ypred = best_model.predict(Xval)
accuracy = accuracy_score(yval, ypred)
precision = precision_score(yval, ypred)
recall = recall_score(yval, ypred)
f1 = f1_score(yval, ypred)
auc = roc_auc_score(yval, ypred)
NormalizedAcc = (accuracy * 0.5) / NormalizeAcc
backtest = backtester(Xval ,best_model ,period=Xval.shape[0] ,lev=1)
nnp = Buybacktester(Xval , amount=10000 , fee=0.01 , period=Xval.shape[0] , lev=1)
trained_models[name] = {'model': best_model,
'acc': accuracy ,
'backtest' : backtest,
'nnp' : nnp,
'NormalizedAcc' : NormalizedAcc,
'precision' : precision,
'recall' : recall,
'f1' : f1,
'auc' : auc}
#print(f"Best hyperparameters: {grid_search.best_params_}")
#print(f"GS Test accuracy: {accuracy} Norm Test accuracy:{Naccuracy} \n")
return trained_models
#--------------------------------------------------------------------------------------------------------
def Main(data, gridsearch=False , shift=10):
data = AddOsil(data)
data = AddLabel(data)
X,y = TimeSeriesGen(data , shift)
Xtrain , Xtest , ytrain , ytest = slicingTimeseries(X[160:-1:],y[160:-1:],sliceTrain=0.90)#95
Xlearn, Xval, ylearn, yval = slicingTimeseries(Xtrain,ytrain,sliceTrain=0.90)#7
dic1 = BaselineModels(Xlearn,ylearn,Xval,yval)
if gridsearch == False:
dic2 = ParamsModels(Xlearn,ylearn,Xval,yval)
print('Models-------NorNormAcc:---------ParNormAcc--------NormalACC:---------ParamACC:')
for i ,j in dic1.items():
for z ,d in dic2.items():
if i == z :
print(i,'===',j['NormalizedAcc'],d['NormalizedAcc'],j['acc'],d['acc'])
return dic1,dic2,Xtest,ytest
elif gridsearch == True:
dicGS = GridSearchModels(Xlearn,ylearn,Xval,yval)
print('Models-------NorNormAcc:---------GridNormAcc--------NormalACC:---------GridACC:')
for i ,j in dic1.items():
for z ,d in dicGS.items():
if i == z :
print(i,'===',j['NormalizedAcc'],d['NormalizedAcc'],j['acc'],d['acc'])
return dic1,dicGS,Xtest,ytest
#--------------------------------------------------------------------------------------------------------
def CreateDF(Maindf):
index = []
data = {}
data['model'] = []
data['Valacc'] = []
data['ValNormalizedAcc'] = []
data['Valprecision'] = []
data['Valrecall'] = []
data['Valf1'] = []
data['Valbacktest'] = []
data['Valnnp'] = []
for i in Maindf.keys():
for j,k in Maindf[i]['BModel'].items():
index.append(i+' N')
data['model'].append(k['model'])
data['Valacc'].append(k['acc'])
data['ValNormalizedAcc'].append(k['NormalizedAcc'])
data['Valprecision'].append(k['precision'])
data['Valrecall'].append(k['recall'])
data['Valf1'].append(k['f1'])
data['Valbacktest'].append(k['backtest'])
data['Valnnp'].append(k['nnp'])
for n,m in Maindf[i]['PModel'].items():
index.append(i+' P')
data['model'].append(m['model'])
data['Valacc'].append(m['acc'])
data['ValNormalizedAcc'].append(m['NormalizedAcc'])
data['Valprecision'].append(m['precision'])
data['Valrecall'].append(m['recall'])
data['Valf1'].append(m['f1'])
data['Valbacktest'].append(m['backtest'])
data['Valnnp'].append(m['nnp'])
if 'GModel' in Maindf[i].keys() :
for e,r in Maindf[i]['GModel'].items():
index.append(i+' G')
data['model'].append(r['model'])
data['Valacc'].append(r['acc'])
data['ValNormalizedAcc'].append(r['NormalizedAcc'])
data['Valprecision'].append(r['precision'])
data['Valrecall'].append(r['recall'])
data['Valf1'].append(r['f1'])
data['Valbacktest'].append(r['backtest'])
data['Valnnp'].append(r['nnp'])
else:
print("GModel does not exist in row", i)
df = pd.DataFrame(data, index=index)
df ['ValProfit %'] = df['Valbacktest']/df['Valnnp']*100
return df
#-----------------------------------------------------------------------------------------------------
def ADDTestFeatures(df , Maindf):
c=0
TESTbackt = []
nnpt = []
# TrainAccuracy = []
#add new featurs to set
#
for i in df.index:
ticker = i[:-2]
Xt = Maindf[ticker]['Xtest']
Yt = Maindf[ticker]['ytest']
#Yp = df['model'].values[c].predict(Xt)
#accuracy = accuracy_score(Yt , Yp)
#precision = precision_score(Yt , Yp)
#recall = recall_score(Yt , Yp)
#f1 = f1_score(Yt , Yp)
#auc = roc_auc_score(Yt , Yp)
backtest = backtester(Xt ,df['model'].values[c] ,period=Xt.shape[0] ,lev=1)
Testnnp = Buybacktester(Xt , amount=10000 , fee=0.01 , period=Xt.shape[0] , lev=1)
c = c+1
TESTbackt.append(backtest)
nnpt.append(Testnnp)
df['Testbacktest'] = TESTbackt
df['Testnnp'] = nnpt
df ['TestProfit %'] = df['Testbacktest']/df['Testnnp']*100
df['Label'] = np.where(df['TestProfit %'] > 103, 1, 0)
return df
#-----------------------------------------------------------------------------------------------------
def TrainValTestSplit(df,test_size = 0.30,val_size = 0.30):
y = df['Label']
X = df
BXtrain, BXtest, Bytrain, Bytest = train_test_split(X, y, test_size = test_size, shuffle=True)
#, 'Valacc' , 'Valbacktest' , 'Valnnp'
z = BXtest[['Testbacktest','TestProfit %','model','Testnnp','Label']]
BXtrain = BXtrain.drop(['Testbacktest','TestProfit %','model','Testnnp','Label'], axis=1)
BXtest = BXtest.drop(['Testbacktest','TestProfit %','model','Testnnp','Label'], axis=1)
BXlearn, BXval, Bylearn, Byval = train_test_split(BXtrain, Bytrain, test_size = val_size)
#print(BXtrain.shape, BXtest.shape, Bytrain.shape, Bytest.shape)
#print(BXlearn.shape, BXval.shape, Bylearn.shape, Byval.shape)
#print(z.shape)
return BXlearn, BXval, Bylearn, Byval, BXtest, Bytest, z
#-----------------------------------------------------------------------------------------------------
def BestModelFinder(BXtrain, BXtest, Bytrain, Bytest):
paramModel = {}
paramModel['GBC'] = {'model': GradientBoostingClassifier(n_estimators=100, learning_rate=0.01, max_depth=12)}
paramModel['LRC'] = {'model': LogisticRegression(C=0.1, max_iter=10000, solver='lbfgs')}
paramModel['DTC'] = {'model': DecisionTreeClassifier(max_depth = 12, min_samples_split=6, min_samples_leaf=4)}
paramModel['RFC'] = {'model': RandomForestClassifier(n_estimators = 500, max_depth = 10)}
paramModel['KNC'] = {'model': KNeighborsClassifier(n_neighbors = 7, weights='distance')} #7
paramModel['GNB'] = {'model': GaussianNB(var_smoothing=1e-09)}
paramModel['MLP'] = {'model': MLPClassifier(hidden_layer_sizes = (69),max_iter = 10000 ,activation='relu', alpha=0.0001)}
paramModel['VOT'] = {'model': VotingClassifier(estimators=[('GBC', paramModel['GBC']['model']), ('LRC', paramModel['LRC']['model']),
('GNB', paramModel['GNB']['model']),('DTC', paramModel['DTC']['model']), ('KNC', paramModel['KNC']['model']), ('RFC', paramModel['RFC']['model']),
('MLP', paramModel['MLP']['model'])
],
voting='hard')}
paramModel['STK'] = {'model': StackingClassifier(estimators=[('GBC', paramModel['GBC']['model']), ('LRC', paramModel['LRC']['model']),
('GNB', paramModel['GNB']['model']),('DTC', paramModel['DTC']['model']), ('KNC', paramModel['KNC']['model']), ('RFC', paramModel['RFC']['model']),
('MLP', paramModel['MLP']['model'])
],
final_estimator=LogisticRegression())}
for i in paramModel.keys():
paramModel[i]['model'].fit(BXtrain, Bytrain)
ypred = paramModel[i]['model'].predict(BXtest)
accuracy = accuracy_score(Bytest, ypred)
precision = precision_score(Bytest, ypred)
recall = recall_score(Bytest, ypred)
f1 = f1_score(Bytest, ypred)
ypred2 = paramModel[i]['model'].predict(BXtrain)
accuracy2 = accuracy_score(Bytrain, ypred2)
precision2 = precision_score(Bytrain, ypred2)
recall2 = recall_score(Bytrain, ypred2)
f12 = f1_score(Bytrain, ypred2)
paramModel[i]['LearnACC'] = accuracy2
paramModel[i]['Learnprecision'] = precision2
paramModel[i]['Learnrecall'] = recall2
paramModel[i]['Learnf1'] = f12
paramModel[i]['ValACC'] = accuracy
paramModel[i]['Valprecision'] = precision
paramModel[i]['Valrecall'] = recall
paramModel[i]['Valf1'] = f1
return paramModel
#-----------------------------------------------------------------------------------------------------
def plotFinalModel(model, p, data , title ,lev=1):
X = data.iloc[-p::,:77:]
X["output"] = model.predict(X.iloc[::,::])
X["NormalReturns"] = X["returns"].iloc[:,0].cumsum()
X["output"] = X["output"].replace(0 , -1)
X['outputshift'] = X['output'].shift(1)
X['strategy'] = X["returns"].iloc[:,0] * X["outputshift"] * lev
X["StrategyReturns"] = X["strategy"].cumsum()
X[["NormalReturns","StrategyReturns"]].plot(figsize = (12 , 8))
plt.title(title)
plt.show()
#-----------------------------------------------------------------------------------------------------
def plotBestModel(BestDic, BXtest, Bestmetric='ValACC', lev=1, showResultDf=True, label='PLabel', showbest=False):
best_model_name = max(BestDic, key=lambda key: BestDic[key][Bestmetric])
best_model_test_acc = BestDic[best_model_name]['ValACC']
if 'PLabel' in BXtest.columns:
BXtest = BXtest.drop(['PLabel'] , axis=1)
ypred = BestDic[best_model_name]['model'].predict(BXtest)
print('best instrument-model selector is:',best_model_name,'with acc:',best_model_test_acc)
result_df = pd.concat([BXtest, z], axis=1)
result_df['PLabel'] = ypred
if showbest==True:
df = result_df[result_df[label] == 1]
max_row = df[df['Valacc'] == df['Valacc'].max()]
ticker = max_row.index[0][:-2]
TestData = Maindf[ticker]['XTest']
ChosseModel = max_row['model'].values[0]
display(ChosseModel)
plotFinalModel(ChosseModel, TestData.shape[0], TestData , ticker , lev=lev)
else:
for i in range(1,result_df[result_df[label] == 1].shape[0]+1):
ticker = result_df[result_df[label] == 1][i-1:i].index[0][:-2]
TestData = Maindf[ticker]['XTest']
ChosseModel = result_df[result_df[label] == 1][i-1:i]['model'].values[0]
display(ChosseModel)
plotFinalModel(ChosseModel, TestData.shape[0], TestData , ticker , lev=lev)
if showResultDf==True:
pd.set_option('display.max_columns', 9)
display(result_df[result_df[label] == 1])
pd.reset_option('display.max_columns')
#-------------------------------------------------------------------------------------------------
def PlotSumModel(Finaldf, symbolsreturn):
DF = pd.DataFrame()
DF['ModelsReturn'] = Finaldf["column_sums"]
DF['NormalReturn'] = symbolsreturn["column_sums"]
DF[["ModelsReturn","NormalReturn"]].plot(figsize = (12 , 8))
plt.title('sum of all tradings')
plt.show()
#-------------------------------------------------------------------------------------------------
def SumBestModels(BestDic, BXtest, lev=1 ,label='PLabel', LimitRows=False):
def Bestreturns(model, p, data ,lev=1):
X = data.iloc[-p::,::]
X["output"] = model.predict(X.iloc[::,::])
X["NormalReturns"] = X["returns"].iloc[:,0].cumsum()
X["output"] = X["output"].replace(0 , -1)
X['outputshift'] = X['output'].shift(1)
X['strategy'] = X["returns"].iloc[:,0] * X["outputshift"] * lev
X["StrategyReturns"] = X["strategy"].cumsum()
return X["StrategyReturns"],X["NormalReturns"]
best_model_name = max(BestDic, key=lambda key: BestDic[key]["ValACC"]+BestDic[key]["Valprecision"])
best_model_test_acc = BestDic[best_model_name]['ValACC']
if 'PLabel' in BXtest.columns:
BXtest = BXtest.drop(['PLabel'] , axis=1)
ypred = BestDic[best_model_name]['model'].predict(BXtest)
print('best instrument-model selector is:',best_model_name,'with acc:',best_model_test_acc)
result_df = pd.concat([BXtest, z], axis=1)
result_df['PLabel'] = ypred
finaldic = {}
symbolreturn = {}
df1 = result_df[result_df['PLabel'] == 1]
selected_rows = df1[df1['ValProfit %'] > 100]
if LimitRows == False:
SelectedDF = df1
else:
SelectedDF = selected_rows
for i in range(1,SelectedDF.shape[0]+1):
ticker = SelectedDF[i-1:i].index[0][:-2]
TestData = Maindf[ticker]['Xtest']
ChosseModel = SelectedDF[i-1:i]['model'].values[0]
finaldic[ticker+str(i)], symbolreturn[ticker+str(i)] = Bestreturns(ChosseModel, TestData.shape[0], TestData ,lev=lev)
df1 = pd.DataFrame(finaldic)
df2 = pd.DataFrame(symbolreturn)
df1.fillna(method='ffill', inplace=True)
df2.fillna(method='ffill', inplace=True)
df1['column_sums'] = df1.sum(axis=1)
df2['column_sums'] = df2.sum(axis=1)
return df1 ,df2
#-----------------------------------------------------------------------------------------------------
def SumBestModelsImp(MainIdf, BestDic, BXtest, z, lev=1 ,label='PLabel', LimitRows=False):
def Bestreturns(model, p, data ,lev=1):
X = data.iloc[-p::,::]
X["output"] = model.predict(X.iloc[::,::])
X["NormalReturns"] = X["returns"].iloc[:,0].cumsum()
X["output"] = X["output"].replace(0 , -1)
X['outputshift'] = X['output'].shift(1)
X['strategy'] = X["returns"].iloc[:,0] * X["outputshift"] * lev
X["StrategyReturns"] = X["strategy"].cumsum()
return X["StrategyReturns"],X["NormalReturns"]
#------------------------------------------------------------------
best_model_name = max(BestDic, key=lambda key: BestDic[key]["ValACC"]+BestDic[key]["Valprecision"])
best_model_test_acc = BestDic[best_model_name]['ValACC']
if 'PLabel' in BXtest.columns:
BXtest = BXtest.drop(['PLabel'] , axis=1)
ypred = BestDic[best_model_name]['model'].predict(BXtest)
print('best instrument-model selector is:',best_model_name,'with acc:',best_model_test_acc)
result_df = pd.concat([BXtest, z], axis=1)
result_df['PLabel'] = ypred
finaldic = {}
symbolreturn = {}
df1 = result_df[result_df['PLabel'] == 1]
selected_rows = df1[df1['ValProfit %'] > 100]
if LimitRows == False:
SelectedDF = df1
else:
SelectedDF = selected_rows
for i in range(1,SelectedDF.shape[0]+1):
ticker = SelectedDF[i-1:i].index[0][:-2]
TestData = MainIdf[ticker]['Xtest']
ChosseModel = SelectedDF[i-1:i]['model'].values[0]
finaldic[ticker+str(i)], symbolreturn[ticker+str(i)] = Bestreturns(ChosseModel, TestData.shape[0], TestData ,lev=lev)
df1 = pd.DataFrame(finaldic)
df2 = pd.DataFrame(symbolreturn)
df1.fillna(method='ffill', inplace=True)
df2.fillna(method='ffill', inplace=True)
df1['column_sums'] = df1.sum(axis=1)
df2['column_sums'] = df2.sum(axis=1)
return df1 ,df2 ,SelectedDF
# Your New Python File
from AlgorithmImports import *
from lib import *
import warnings
from sklearn.exceptions import UndefinedMetricWarning, ConvergenceWarning
warnings.filterwarnings("ignore", category=DeprecationWarning)
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=ConvergenceWarning)
warnings.filterwarnings("ignore", category=UndefinedMetricWarning)
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=pd.errors.SettingWithCopyWarning)
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import plotly.graph_objects as go
import yfinance as yf
import pandas_ta as ta
from IPython.core.display import display, HTML
from collections import Counter
import time
from datetime import datetime
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.naive_bayes import GaussianNB, MultinomialNB
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.ensemble import VotingClassifier
from sklearn.ensemble import StackingClassifier
from sklearn.svm import LinearSVC
from sklearn.svm import SVC as SupportVectorClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.model_selection import train_test_split
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import KFold
from sklearn.model_selection import StratifiedKFold
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import accuracy_score
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import f1_score
from sklearn.metrics import confusion_matrix
from sklearn.metrics import roc_curve, roc_auc_score
#-----------------------------------------------------------------------------------------------------------
#-----------------------------------------------------------------------------------------------------------
#-----------------------------------------------------------------------------------------------------------
#-----------------------------------------------------------------------------------------------------------
#-----------------------------------------------------------------------------------------------------------
#-----------------------------------------------------------------------------------------------------------
#-----------------------------------------------------------------------------------------------------------
#-----------------------------------------------------------------------------------------------------------
#-----------------------------------------------------------------------------------------------------------
#-----------------------------------------------------------------------------------------------------------
#-----------------------------------------------------------------------------------------------------------
#-----------------------------------------------------------------------------------------------------------
#-----------------------------------------------------------------------------------------------------------
#-----------------------------------------------------------------------------------------------------------
#-----------------------------------------------------------------------------------------------------------
#-----------------------------------------------------------------------------------------------------------
class MyAlgorithm(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2023, 1, 1)
self.SetCash(1000000)
self.symbols = ["AAPL", "MSFT", "GOOG", "AMZN", "FB", "TSLA", "NFLX", "NVDA", "BABA", "JPM", "BND", "SPY"]
#self. symbols = ["AMZN", "TSLA", "NFLX", "NVDA", "AAPL", "MSFT"]
self.shifting = 1
self.instList = []
self.MainIdf = {}
qb = QuantBook()
# Download historical data and set leverage
self.Log('downloads: -------------------------------------------------------')
for symbol in self.symbols:
equity = self.AddEquity(symbol, Resolution.Daily)
equity.SetLeverage(2)
history = self.History(equity.Symbol, 2000, Resolution.Daily)
df = history.loc[symbol]
df.rename(columns={'open': 'Open', 'high': 'High', 'low': 'Low', 'close': 'Close', 'volume': 'Volume'}, inplace=True)
self.MainIdf[symbol] = {'DataFrame' : df}
self.instList.append((equity.Symbol, symbol))
self.Log(f"{symbol} : Downloaded")
self.Log('outputs: -------------------------------------------------------')
# Load DataFrame from pickle
import pickle
import io
import base64
if self.ObjectStore.ContainsKey('data1.pkl'):
base64_str = self.ObjectStore.Read('data1.pkl')
data = base64.b64decode(base64_str)
buffer = io.BytesIO(data)
buffer.seek(0)
self.SelectedF = pickle.load(buffer)
self.Debug(f"DataFrame with models loaded successfully: {self.SelectedF}")
else:
self.Debug("File not found in Object Store")
self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.At(10, 0), self.PlaceOrders)
self.Schedule.On(self.DateRules.Tomorrow, self.TimeRules.At(10, 0), self.CloseOrders)
def OnData(self, data):
pass
def PlaceOrders(self):
self.Log('orders: -------------------------------------------------------')
ordertype = {}
for i in range(1, self.SelectedF.shape[0] + 1):
ticker = self.SelectedF.iloc[i - 1].name[:-2]
instrument = None
for j in range(len(self.instList)):
if self.instList[j][1] == ticker:
instrument = self.instList[j][0]
self.Log(instrument)
break
TData = self.MainIdf[ticker]['DataFrame'][-200:]
ToData = AddOsil(TData)
TodData = AddLabel(ToData)
TodaData, y = TimeSeriesGen(TodData, self.shifting)
TodayData = TodaData[-1:]
ChosenModel = self.SelectedF.iloc[i - 1]['model']
order = ChosenModel.predict(TodayData)
self.Log(order)
if order[0] == 1:
self.Log(f'{instrument} Buy Order Placed for ticker: {ticker}')
self.SetHoldings(instrument, 1) # Long position
ordertype[ticker + str(i)] = 'Buy'
elif order[0] == 0:
self.Log(f'{instrument} Sell Order Placed for ticker: {ticker}')
self.SetHoldings(instrument, -1) # Short position
ordertype[ticker + str(i)] = 'Sell'
self.Log('---------------------------------------------------------')
self.Log('Orders Placement is done.')
def CloseOrders(self):
self.Log('Closing all positions after 24 hours')
self.Liquidate()