| Overall Statistics |
|
Total Trades 6807 Average Win 0.28% Average Loss -0.23% Compounding Annual Return 5.040% Drawdown 12.100% Expectancy 0.062 Net Profit 48.200% Sharpe Ratio 0.704 Loss Rate 51% Win Rate 49% Profit-Loss Ratio 1.19 Alpha 0.064 Beta -0.608 Annual Standard Deviation 0.074 Annual Variance 0.005 Information Ratio 0.433 Tracking Error 0.074 Treynor Ratio -0.085 Total Fees $13416.29 |
import math
import bisect
import operator
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import random
import talib as tb
import numpy as np
import pandas as pd
import sklearn as sn
from sklearn.neighbors import KNeighborsRegressor as KNR
from sklearn.linear_model import LinearRegression as LR
from sklearn.tree import DecisionTreeRegressor as DTR
from sklearn.ensemble import RandomForestRegressor as RFR
from clr import AddReference
from scipy.optimize import brute
from sklearn.model_selection import train_test_split as TTS
from sklearn.metrics import mean_absolute_error as mae
from sklearn.metrics import mean_squared_error as mse
AddReference("System")
AddReference("QuantConnect.Algorithm")
AddReference("QuantConnect.Common")
from System import *
from QuantConnect import *
from QuantConnect.Algorithm import *
seed = 1
random.seed(seed)
np.random.seed(seed)
class Model():
def __init__(self):
# len of hitory
self.eval_lookback = 500
self.train_lookback = 3000
self.n_features = 5
self.init = 0
self.warmup_count = self.train_lookback + self.eval_lookback
self.ttrig = 1
def func(self,x):
if np.count_nonzero(x) < 2 or np.count_nonzero(x) > 5:
return 10000.0
reg = KNR(n_neighbors=5, weights='distance')
X,Y = self.indicators(x)
X_train, X_test, Y_train, Y_test = TTS(X, Y, test_size=0.33,shuffle=False)
scaler_x = StandardScaler()
X_train = scaler_x.fit_transform(X_train)
X_test = scaler_x.transform(X_test)
reg.fit(X_train,Y_train)
Y_pred = reg.predict(X_test)
f1 = Y_test >= 0
f2 = Y_pred >= 0
f3 = Y_test < 0
f4 = Y_pred < 0
both_positive = f1 & f2
both_neg = f3 & f4
Y_pred[both_positive] = 0
Y_test[both_positive] = 0
Y_pred[both_neg] = 0
Y_test[both_neg] = 0
sharpe = mae(Y_test,Y_pred)
return sharpe
def indicators(self, x, how='fit'):
opn = self.market[:,0]
high = self.market[:,1]
low = self.market[:,2]
close = self.market[:,3]
vol = self.market[:,4]
df_xy = np.zeros((self.market.shape[0],10),float)
if x[0] != 0:
df_xy[:,0] = tb.CCI(high, low, close, timeperiod=x[0])
if x[1] != 0:
df_xy[:,1] = tb.AROONOSC(high,low, timeperiod=x[1])
if x[2] != 0:
df_xy[:,2] = tb.SMA(close, timeperiod=x[2])/close
if x[3] != 0:
df_xy[:,3] = tb.SMA(close, timeperiod = x[3])/close
if x[4] != 0:
df_xy[:,4] = tb.ATR(high, low, close, timeperiod=x[4])
if x[5] != 0:
df_xy[:,5] = tb.NATR(high, low, close, timeperiod=x[5])
if x[6] != 0:
df_xy[:,6] = tb.CCI(high, low, close, timeperiod=x[6])
if x[7] != 0:
df_xy[:,7] = tb.MFI(high, low, close, vol, timeperiod=x[7])
if x[8] != 0:
df_xy[:,8] = tb.ATR(high, low, close, timeperiod=x[8])
if x[9] != 0:
df_xy[:,9] = tb.ATR(high, low, close, timeperiod=x[9])
y_period = 200
y = (tb.SMA(close,timeperiod=y_period)[y_period:]-close[:-y_period])/close[:-y_period]
y = y[500:]
y = y.reshape(-1,1)
x = df_xy[-y.shape[0]-y_period:-y_period,:]
x[abs(x)>1e5] = 0
y[abs(y)>1] = 0
if how == 'predict':
return df_xy[-1,:].reshape(1,-1)
return x,y
def find_solution(self):
step = 250
reg = KNR()
ranges = (slice(0, 500, step),) * 10
for i in range(7):
x0,fval,grid,Jout = brute(self.func, ranges=ranges, disp=True, finish=None, full_output=True)
step = int(step/1.5)
ranges = ((np.max([x0[0]-step,0]),np.min([x0[0]+step,500]),step),
(np.max([x0[1]-step,0]),np.min([x0[1]+step,500]),step),
(np.max([x0[2]-step,0]),np.min([x0[2]+step,500]),step),
(np.max([x0[3]-step,0]),np.min([x0[3]+step,500]),step),
(0,1,2),
(np.max([x0[5]-step,0]),np.min([x0[5]+step,500]),step),
(np.max([x0[6]-step,0]),np.min([x0[6]+step,500]),step),
(np.max([x0[7]-step,0]),np.min([x0[7]+step,500]),step),
(0,1,2),
(0,1,2))
X,Y = self.indicators(x0)
scaler = StandardScaler()
X = scaler.fit_transform(X)
reg.fit(X,Y)
return x0, reg, scaler
def preprocessing_market(self):
self.market = np.zeros((np.array(list(self.hist_open), float).shape[0],5),float)
self.market[:,0] = np.array(list(self.hist_open)[::-1], float)
self.market[:,1] = np.array(list(self.hist_high)[::-1], float)
self.market[:,2] = np.array(list(self.hist_low)[::-1], float)
self.market[:,3] = np.array(list(self.hist_close)[::-1], float)
self.market[:,4] = np.array(list(self.hist_vol)[::-1], float)
class Position():
def __init__(self):
self.max_position = 4.0
self.num_of_stds = 14.0
self.price = 0.0
self.position = 0.0
self.volatility = 0.0
self.current_price = 0.0
self.pl_fac = 1.0
def manage(self):
if self.position != 0.0:
sign = abs(self.position)/self.position
if (self.price+self.num_of_stds*self.volatility < self.current_price and self.position>0.0):
self.price = self.current_price
if abs(self.position) < self.max_position:
self.position += 1.0
return 1.0
else:
return 0.0
if (self.price-self.num_of_stds*self.volatility > self.current_price and self.position<0.0):
self.price = self.current_price
if abs(self.position) < self.max_position:
self.position -= 1.0
return 1.0
else:
return 0.0
if (self.price-self.volatility*(self.num_of_stds)/self.pl_fac > self.current_price and self.position>0):
return -1.0
if (self.price+self.volatility*(self.num_of_stds)/self.pl_fac < self.current_price and self.position<0):
return -1.0
return 0.0
class BasicTemplateAlgorithm(QCAlgorithm):
def Initialize(self):
self.model = Model()
self.position1 = Position()
self.cash = 100000
self.SetStartDate(2010,1,1) #Set Start Date
self.SetEndDate(2018,1,1) #Set End Date
self.SetCash(self.cash) #Set Strategy Cash
self.multiplier = 100.0
self.init_coef = 0.6 # initial trade size - fraction of the deposit
self.add_coef = 0.2 # position management trade size - fraction of the deposit
# Find more symbols here: http://quantconnect.com/data
self.symbol = "SPY"
self.model.symbol = self.symbol
self.granularity = Resolution.Minute
self.position_size = 0.6
self.HighBar = RollingWindow[float](self.model.warmup_count)
self.LowBar = RollingWindow[float](self.model.warmup_count)
self.OpenBar = RollingWindow[float](self.model.warmup_count)
self.CloseBar = RollingWindow[float](self.model.warmup_count)
self.VolBar = RollingWindow[float](self.model.warmup_count)
self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin)
mm = self.AddEquity(self.symbol, self.granularity)
mm.MarginModel = PatternDayTradingMarginModel()
self.SetWarmUp(self.model.warmup_count)
self.consolidator = TradeBarConsolidator(5)
self.consolidator.DataConsolidated += self.OnDataConsolidated
self.SubscriptionManager.AddConsolidator(self.symbol, self.consolidator)
sPlot = Chart('Strategy Equity')
self.atr_slow = self.ATR(self.symbol, 160, MovingAverageType.Simple, Resolution.Minute);
self.atr_fast = self.ATR(self.symbol, 30, MovingAverageType.Simple, Resolution.Minute);
#self.filter1.volatility = self.atr_fast
#self.filter1.current_price = float(self.Securities[self.symbol].Price)
self.Schedule.On(self.DateRules.MonthStart(self.symbol), self.TimeRules.AfterMarketOpen(self.symbol), \
Action(self.reset_train))
self.Schedule.On(self.DateRules.EveryDay(self.symbol), self.TimeRules.BeforeMarketClose(self.symbol,20), \
Action(self.liqui))
self.counter = 0
def liqui(self):
self.Liquidate(self.symbol)
self.position1.position = 0.0
def OnDataConsolidated(self, sender, TradeBar):
try:
self.HighBar.Add(float(TradeBar.High))
self.LowBar.Add(float(TradeBar.Low))
self.OpenBar.Add(float(TradeBar.Open))
self.CloseBar.Add(float(TradeBar.Close))
self.VolBar.Add(float(TradeBar.Volume))
except:
self.Debug('Failed to retrieve the quotes')
self.counter += 1
if self.counter >= self.model.warmup_count:
self.model.ttrig = 1
stock_coef = self.cash/float(self.Securities[self.symbol].Price)
self.model.hist_open = self.OpenBar
self.model.hist_close = self.CloseBar
self.model.hist_high = self.HighBar
self.model.hist_low = self.LowBar
self.model.hist_vol = self.VolBar
self.model.preprocessing_market()
if self.model.market.shape[0] < self.model.warmup_count:
return
if self.model.ttrig == 1:
self.x0,self.reg,self.scaler = self.model.find_solution()
self.model.ttrig = 0
self.counter = 0
X_current = self.model.indicators(self.x0,how='predict')
self.Debug(str(self.x0))
X_curr_scaled = self.scaler.transform(X_current)
prediction = self.reg.predict(X_curr_scaled)
holdings = float(self.Portfolio[self.symbol].Quantity)
self.position1.volatility = float(self.atr_fast.Current.Value)
self.position1.current_price = float(self.Securities[self.symbol].Price)
#position management
if self.position1.position != 0.0 and holdings != 0.0:
self.position1.volatility = float(self.atr_fast.Current.Value)
action = self.position1.manage()
if self.position1.position > 0:
direc = OrderDirection.Buy
else:
direc = OrderDirection.Sell
bp = float(self.Portfolio.GetMarginRemaining(self.symbol))
if self.position1.position < 0:
ordr = -1
else:
ordr = 1
if action == 1.0 and abs(bp) > abs(stock_coef*self.add_coef)*1.5*self.position1.current_price:
self.MarketOrder(self.symbol, ordr*stock_coef*self.add_coef)
self.position1.price = float(self.Securities[self.symbol].Price)
if action == -1.0:
self.position1.position = 0.0
self.Liquidate(self.symbol)
else:
return
bp = float(self.Portfolio.GetMarginRemaining(self.symbol))
if self.position1.position == 0.0 and holdings == 0.0:
if prediction > 0.0 and abs(bp) > stock_coef*self.init_coef*self.position1.current_price:
self.position1.position = 1.0
self.MarketOrder(self.symbol, stock_coef*self.init_coef)
self.position1.price = float(self.Securities[self.symbol].Price)
self.position1.current_price = float(self.Securities[self.symbol].Price)
self.position1.volatility = float(self.atr_fast.Current.Value)
if prediction < 0.0 and abs(bp) > stock_coef*self.init_coef*self.position1.current_price:
self.position1.position = -1.0
self.MarketOrder(self.symbol, -stock_coef*self.init_coef)
self.position1.volatility = float(self.atr_fast.Current.Value)
self.position1.price = float(self.Securities[self.symbol].Price)
self.position1.current_price = float(self.Securities[self.symbol].Price)
if float(self.Portfolio[self.symbol].Quantity) != 0 and self.position1.position ==0:
self.position1.position = abs(float(self.Portfolio[self.symbol].Quantity))/float(self.Portfolio[self.symbol].Quantity)
self.Debug('ACHTUNG!!!!')
self.previous = self.Time
def reset_train(self):
self.model.ttrig = 1