| Overall Statistics |
|
Total Trades 1225 Average Win 1.89% Average Loss -1.64% Compounding Annual Return 35.692% Drawdown 49.400% Expectancy -0.033 Net Profit 530.504% Sharpe Ratio 0.963 Loss Rate 55% Win Rate 45% Profit-Loss Ratio 1.15 Alpha 0.63 Beta -20.317 Annual Standard Deviation 0.309 Annual Variance 0.096 Information Ratio 0.91 Tracking Error 0.309 Treynor Ratio -0.015 Total Fees $1529.01 |
import numpy as np
import datetime
from scipy import stats
class StocksOnTheMove(QCAlgorithm):
'''Basic template algorithm simply initializes the date range and cash'''
def Initialize(self):
'''Initialise the data and resolution required, as well as the cash and start-end dates for your algorithm. All algorithms must initialized.'''
self.SetStartDate(2012,1,1) #Set Start Date
#self.SetEndDate(2012,3,1) #Set End Date
self.SetCash(100000) #Set Strategy Cash
# Find more symbols here: http://quantconnect.com/data
self.AddEquity("SPY", Resolution.Minute)
# what resolution should the data *added* to the universe be?
self.UniverseSettings.Resolution = Resolution.Daily
# How many stocks in the starting universe?
self.__numberOfSymbols = 200
# How many stocks in the portfolio?
self.number_stocks = 30
# this add universe method accepts two parameters:
self.AddUniverse(self.CoarseSelectionFunction)
# How far back are we looking for momentum?
self.momentum_period = 20
# Schedule Indicator Update, Ranking + Rebal
self.Schedule.On(self.DateRules.MonthStart("SPY"),
self.TimeRules.AfterMarketOpen("SPY", 30),
Action(self.rebalance))
self.Schedule.On(self.DateRules.MonthStart("SPY"),
self.TimeRules.BeforeMarketClose("SPY", 0),
Action(self.UpdateIndicators))
# Set Risk Factor for position sizing
self.risk_factor = 0.001
# Set empty list for universe
self.universe = []
# Set empty dictionary for managing & ranking the slope
self.indicators_r2 = {}
self.last_month_fired_coarse = None #we cannot rely on Day==1 like before
self.last_month_fired_rebalance = None #we cannot rely on Day==1 like before
def UpdateIndicators(self):
# This updates the indicators at each data step
for symbol in self.universe:
# is symbol iin Slice object? (do we even have data on this step for this asset)
if self.Securities.ContainsKey(symbol):
# Update the dictionary for the indicator
if symbol in self.indicators_r2:
self.indicators_r2[symbol].update(self.Securities[symbol].Price)
# Run a coarse selection filter for starting universe
def CoarseSelectionFunction(self, coarse):
today = self.Time
#self.Log("Day = {} Month = {}".format(today.day,today.month))
# Set the Universe to rebalance on the 1st day of each quarter (can play around with this as required)
if self.last_month_fired_coarse != today.month:
self.last_month_fired_coarse = today.month
self.Log("Day = {} Month = {}".format(today.day,today.month))
CoarseWithFundamental = [x for x in coarse if x.HasFundamentalData]
sortedByDollarVolume = sorted(CoarseWithFundamental, key=lambda x: x.DollarVolume, reverse=True)
result = [ x.Symbol for x in sortedByDollarVolume[:self.__numberOfSymbols] ]
self.universe = result
return self.universe
else:
return self.universe
def OnSecuritiesChanged(self, changes):
# Delete indicator from the dict to save Ram
for security in changes.RemovedSecurities:
if security.Symbol in self.indicators_r2:
del self.indicators_r2[security.Symbol]
self.Liquidate(security.Symbol)
# Init a new custom indicator
for security in changes.AddedSecurities:
self.indicators_r2[security.Symbol] = RegressionSlope(self, security.Symbol, self.momentum_period, Resolution.Daily)
def rebalance(self):
today = self.Time
if self.last_month_fired_rebalance != self.last_month_fired_coarse:
# ensure we are fireing after coarse
self.last_month_fired_rebalance = self.last_month_fired_coarse
self.Log("Rebalance")
# get values from dict
symbols, slopes = zip(*[(symbol, self.indicators_r2[symbol].value) \
for symbol in self.indicators_r2 \
if self.indicators_r2[symbol].value is not None])
# sort
idx_sorted = np.argsort(slopes)[::-1] # [::-1] slices backwards i.e. flips to reverse the sort order
symbols =np.array(symbols)[idx_sorted]
slopes = np.array(slopes)[idx_sorted]
# Sort the Dictionary from highest to lowest and take the top values
self.target_portfolio = symbols
self.Log(str(self.target_portfolio))
# Enter or exit positions
for symbol in self.universe:
# Case: invested in the current symbol
if self.Portfolio[symbol].HoldStock:
# Exit if not a target aset
if symbol not in self.target_portfolio:
self.Liquidate(symbol)
elif symbol in self.target_portfolio:
continue
# Case: not invested in the current symbol
else:
# symbol is a target, enter position
if symbol in self.target_portfolio:
# Update ATR for the stock in the new dictionary
self.Log("{} {} {}".format(symbol, self.Securities[symbol].Price, self.indicators_r2[symbol].value))
# Send Orders
self.SetHoldings(symbol, 1./float(self.number_stocks))
class RegressionSlope():
def __init__(self, algo, symbol, window, resolution):
# set up params of per-asset rolling metric calculation
self.symbol = symbol
self.window = window
self.resolution = resolution
# the value we access, None until properly calulated
self.value = None
# We will store the historical window here, and keep it a fixed length in update
self.history = []
# download the window. Prob not great to drag algo scope in here. Could get outside and pass in.
hist_df = algo.History([symbol], window, self.resolution)
# Case where no data to return for this asset. New asset?
if 'close' not in hist_df.columns:
return
# store the target time series
self.history = hist_df.close.values
# calulate the metrics for the current window
self.compute()
def update(self, value):
# update history, retain length
self.history = np.append(self.history, float(value))[1:]
# calulate the metrics for the current window
self.compute()
def compute(self):
# Case where History faiiled to return window, waiting to acrew
# prevent calc until window is statisfied
if len(self.history) < self.window:
return
# copied from previous
x = np.arange(len(self.history))
log_ts = np.log(self.history)
slope, intercept, r_value, p_value, std_err = stats.linregress(x, log_ts)
annualized_slope = (np.power(np.exp(slope), 250) - 1) * 100
annualized_slope = annualized_slope * (r_value ** 2)
# update value
self.value = annualized_slope