| Overall Statistics |
|
Total Trades 68 Average Win 11.23% Average Loss -0.16% Compounding Annual Return 7.875% Drawdown 12.700% Expectancy 2.615 Net Profit 7.875% Sharpe Ratio 0.548 Loss Rate 95% Win Rate 5% Profit-Loss Ratio 71.31 Alpha 0.144 Beta -4.58 Annual Standard Deviation 0.127 Annual Variance 0.016 Information Ratio 0.42 Tracking Error 0.127 Treynor Ratio -0.015 Total Fees $138.08 |
from math import ceil,floor
from datetime import datetime
import pandas as pd
import numpy as np
from sklearn.linear_model import LinearRegression
from QuantConnect.Data.UniverseSelection import *
class TrendFollowingAlgorithm(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2015, 1, 1)
self.SetEndDate(2016, 1, 1)
self.SetCash(100000)
self.reb = 1
self.symbols = None
self.lookback = 252/2
self.profittake = 1.96 # 95% bollinger band
self.maxlever = 0.9 # always hold 10% Cash
self.AddEquity("SPY", Resolution.Minute)
self.multiple = 5.0 # 1% of annual return translate to what weight e.g. 5%
self.PctDailyVolatilityTarget = 0.025 # target daily vol target in %
self.UniverseSettings.Resolution = Resolution.Daily
self.AddUniverse(self.CoarseSelectionFunction, self.FineSelectionFunction)
self.Schedule.On(self.DateRules.MonthStart("SPY"), self.TimeRules.AfterMarketOpen("SPY"), Action(self.universe))
self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.AfterMarketOpen("SPY", 10), Action(self.trail_stop))
self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.AfterMarketOpen("SPY", 28), Action(self.regression))
self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.AfterMarketOpen("SPY", 30), Action(self.trade))
def OnData(self, data):
pass
def calc_vol_scalar(self):
# df_price = pd.DataFrame(self.price, columns=self.price.keys())
df_price = pd.DataFrame(dict([ (key,pd.Series(value)) for key, value in self.price.iteritems() ]))
rets = np.log(df_price).diff().dropna()
lock_value = df_price.iloc[-1]
price_vol = self.calc_std(rets)
volatility_scalar = self.PctDailyVolatilityTarget / price_vol
return volatility_scalar
def calc_std(self, returns):
downside_only = False
if (downside_only):
returns = returns.copy()
returns[returns > 0.0] = np.nan
# Exponentially-weighted moving std
b = returns.ewm(halflife=20, ignore_na=True, min_periods=0, adjust=True).std(bias=False).dropna()
return b.iloc[-1]
def regression(self):
if self.symbols is None: return
history = self.History(self.symbols, self.lookback, Resolution.Daily)
current = self.History(self.symbols, 28, Resolution.Minute)
self.price = {}
for symbol in self.symbols:
self.price[symbol.Value] = list(history.loc[str(symbol)]['open'])
self.price[symbol.Value].append(current.loc[str(symbol)]['open'][0])
A = range( self.lookback + 1 )
for symbol in self.symbols:
# volatility
std = np.std(self.price[symbol.Value])
# Price points to run regression
Y = self.price[symbol.Value]
# Add column of ones so we get intercept
X = np.column_stack([np.ones(len(A)), A])
if len(X) != len(Y):
length = min(len(X), len(Y))
X = X[-length:]
Y = Y[-length:]
A = A[-length:]
# Creating Model
reg = LinearRegression()
# Fitting training data
reg = reg.fit(X, Y)
# run linear regression y = ax + b
b = reg.intercept_
a = reg.coef_[1]
# Normalized slope
slope = a / b *252.0
# Currently how far away from regression line
delta = Y - (np.dot(a, A) + b)
# Don't trade if the slope is near flat (at least %7 growth per year to trade)
slope_min = 0.252
# Long but slope turns down, then exit
if symbol.weight > 0 and slope < 0:
symbol.weight = 0
# short but slope turns upward, then exit
if symbol.weight < 0 and slope < 0:
symbol.weight = 0
# Trend is up
if slope > slope_min:
# price crosses the regression line
if delta[-1] > 0 and delta[-2] < 0 and symbol.weight == 0:
symbol.stopprice = None
symbol.weight = slope
# Profit take, reaches the top of 95% bollinger band
if delta[-1] > self.profittake * std and symbol.weight > 0:
symbol.weight = 0
# Trend is down
if slope < -slope_min:
# price crosses the regression line
if delta[-1] < 0 and delta[-2] > 0 and symbol.weight == 0:
symbol.stopprice = None
symbol.weight = slope
# profit take, reaches the top of 95% bollinger band
if delta[-1] < self.profittake * std and symbol.weight < 0:
symbol.weight = 0
def trade(self):
if self.symbols is None: return
vol_mult = self.calc_vol_scalar()
no_positions = 0
for symbol in self.symbols:
if symbol.weight != 0:
no_positions += 1
for symbol in self.symbols:
if symbol.weight == 0:
self.SetHoldings(symbol, 0)
elif symbol.weight > 0:
self.SetHoldings(symbol, (min(symbol.weight * self.multiple, self.maxlever)/no_positions)*vol_mult[symbol.Value])
elif symbol.weight < 0:
self.SetHoldings(symbol, (max(symbol.weight * self.multiple, -self.maxlever)/no_positions)*vol_mult[symbol.Value])
def trail_stop(self):
if self.symbols is None: return
hist = self.History(self.symbols, 3, Resolution.Daily)
for symbol in self.symbols:
mean_price = (hist.loc[str(symbol)]['close']).mean()
# Stop loss percentage is the return over the lookback period
stoploss = abs(symbol.weight * self.lookback / 252.0) + 1 # percent change per period
if symbol.weight > 0:
if symbol.stopprice < 0:
symbol.stopprice = mean_price / stoploss
else:
symbol.stopprice = max(mean_price / stoploss, symbol.stopprice)
if mean_price < symbol.stopprice:
symbol.weight = 0
self.Liquidate(symbol)
elif symbol.weight < 0:
if symbol.stopprice < 0:
symbol.stopprice = mean_price * stoploss
else:
symbol.stopprice = min(mean_price * stoploss, symbol.stopprice)
if mean_price > symbol.stopprice:
symbol.weight = 0
self.Liquidate(symbol)
else:
symbol.stopprice = None
# def load_symbols(self) :
# self.equities = [
# # Equity
# 'DIA', # Dow
# 'SPY', # S&P 500
# ]
# self.fixedincome = [
# # Fixed income
# 'IEF', # Treasury Bond
# 'HYG', # High yield bond
# ]
# self.alternative = [
# 'USO', # Oil
# 'GLD', # Gold
# 'VNQ', # US Real Estate
# 'RWX', # Dow Jones Global ex-U.S. Select Real Estate Securities Index
# 'UNG', # Natual gas
# 'DBA', # Agriculture
# ]
# syl_list = self.equities + self.fixedincome + self.alternative
# self.symbols = []
# for i in syl_list:
# self.symbols.append(self.AddEquity(i, Resolution.Minute).Symbol)
def CoarseSelectionFunction(self, coarse):
if self.reb != 1:
return []
# sort descending by daily dollar volume
sortedByDollarVolume = sorted(coarse, key=lambda x: x.DollarVolume, reverse=True)
# return the symbol objects of the top entries from our sorted collection
return [ x.Symbol for x in sortedByDollarVolume[:8] ]
# sort the data by P/E ratio and take the top 'NumberOfSymbolsFine'
def FineSelectionFunction(self, fine):
if self.reb != 1:
return []
self.reb = 0
# sort descending by P/E ratio
syl_list = sorted(fine, key=lambda x: x.ValuationRatios.PERatio, reverse=True)
self.symbols = [ x.Symbol for x in syl_list[:5]]
self.Log("The candidate universe: " + str([i.Value for i in self.symbols]))
# initialize the weight and stopprice of the symbo object
for symbol in self.symbols:
symbol.weight = 0
symbol.stopprice = None
return self.symbols
def universe(self):
self.reb = 1
return