| Overall Statistics |
|
Total Trades 4494 Average Win 0.08% Average Loss -0.06% Compounding Annual Return -3.481% Drawdown 28.100% Expectancy -0.089 Net Profit -11.193% Sharpe Ratio -0.264 Probabilistic Sharpe Ratio 0.501% Loss Rate 59% Win Rate 41% Profit-Loss Ratio 1.23 Alpha -0.02 Beta -0.045 Annual Standard Deviation 0.092 Annual Variance 0.009 Information Ratio -0.558 Tracking Error 0.215 Treynor Ratio 0.541 Total Fees $4770.97 |
import statsmodels.api as sm
import pandas as pd
import numpy as np
class ResidualMomentumAlphaModel(AlphaModel):
symbol_data = {}
fama_french_factors = pd.DataFrame()
month = -1
shown = False
def Update(self, algorithm, slice):
# If we have a new month of fama french data, update our df
if slice.ContainsKey('FF'):
self.update_ff(slice['FF'])
if algorithm.IsWarmingUp:
return []
# Only update insights at the start of every month
if algorithm.Time.month == self.month:
return []
self.month = algorithm.Time.month
insights = []
# Sort self.symbol_data values by their residual momentum score
has_score = [s for s in self.symbol_data.values() if s.score is not None]
sorted_by_score = sorted(has_score, key=lambda x: x.score, reverse=True)
# If the universe is too small to grab 10%, do nothing.
num_passed = len(sorted_by_score) // 10
if num_passed == 0:
return []
# Create long insights for top 10% of symbols
for s in sorted_by_score[:num_passed]:
insights.append(Insight(s.Symbol, timedelta(days=30), InsightType.Price, InsightDirection.Up))
# Create short insights for bottom 10% of symbols
for s in sorted_by_score[-num_passed:]:
insights.append(Insight(s.Symbol, timedelta(days=30), InsightType.Price, InsightDirection.Down))
return Insight.Group(insights)
def OnSecuritiesChanged(self, algorithm, changes):
"""
Called each time our universe has changed. For every security we have in the
universe, we ensure its latest 36 full months are full with data. Then we setup
a Residual Momentum indicator and a monthly consolidator, storing both in the
`symbol_data` dictionary.
"""
if len(changes.AddedSecurities) > 0:
# Get 36 month rolling window dates
start_lookback, end_lookback = self.get_lookback_dates(algorithm.Time)
# Get history of symbols over lookback window
added_symbols = [x.Symbol for x in changes.AddedSecurities]
history = algorithm.History(added_symbols, start_lookback, end_lookback)
# Ensure atleast 36 months of history
enough_history = (history.index.levels[1][-1] - history.index.levels[1][0]).days / 30 >= 36
if enough_history:
# Get monthly returns of symbols that have atleast 36 months
monthly_returns = self.get_monthly_returns(changes.AddedSecurities, history, algorithm)
# Determine which symbols passed the 36-month check
passed_symbols = [s for s in added_symbols if str(s.ID) in monthly_returns.columns]
for added in passed_symbols:
# Create residual momentum indicator for this symbol
ret = monthly_returns[[str(added.ID)]].iloc[-36:].copy()
self.symbol_data[added] = ResidualMomentum(added, ret, algorithm, self)
for removed in changes.RemovedSecurities:
if removed.Symbol not in self.symbol_data:
continue
# Remove consolidator
c = self.symbol_data[removed.Symbol].consolidator
algorithm.SubscriptionManager.RemoveConsolidator(removed.Symbol, c)
# Delete residual momentum indicator
self.symbol_data.pop(removed.Symbol, None)
def get_lookback_dates(self, time):
"""
Calculates the start and end dates for the 36 full-month rolling lookback window.
Inputs:
- time
The current algorithm time
Returns 2 datetime objects, `start_date` and `end_date`.
"""
# Calculate end date of the lookback window (the end of the previous month)
algo_time = time.replace(day=1)
if algo_time.month == 1:
algo_time = algo_time.replace(year=algo_time.year-1, month=12)
else:
algo_time = algo_time.replace(month=algo_time.month-1)
end_date = Expiry.EndOfMonth(algo_time)
# Calculate starting date of lookback window (36 full months)
start_date = datetime(end_date.year - 3, end_date.month, 1)
return start_date, end_date
def get_monthly_returns(self, securities, history, algo):
"""
Calculates the monthly returns for a list of securites over a historical period.
Securities with null history are omitted.
Inputs:
- securities
List of security objects to calculate the monthly returns for
- history
DataFrame containing the historical prices of some securities
Returns a DataFrame containing the monthly returns for each security.
"""
symbols = [x.Symbol for x in securities]
# Must not have null history
duration_filter = ~history['close'].unstack(level=0).isnull().any()
duration_filter = duration_filter[duration_filter].index
history = history.loc[duration_filter].copy()
# Roll back the timestamp of our history DataFrame by one day
history = history.unstack(level=0)
history = history.set_index(history.index.map(lambda x: x - timedelta(days=1))).stack().swaplevel()
# Determine monthly start and end trading days
month_start_end = []
start_months = []
idx = history.index.levels[1]
for i, day in enumerate(idx):
start_month = datetime(day.year, day.month, 1)
if start_month not in start_months:
start_months.append(start_month)
if i > 0:
month_start_end[-1].append(idx[i-1]) # Previous month end date
month_start_end.append([day]) # Current month start date
if i == len(idx) - 1:
month_start_end[-1].append(day) # Latest date in history DataFrame
# Populate monthly_returns DataFrame
monthly_returns = pd.DataFrame()
for start_end_dates in month_start_end:
returns_per_sec = {}
for sec in history.index.levels[0]:
if sec not in history.index:
continue
open_price = history.loc[sec].loc[start_end_dates[0]]['open']
close_price = history.loc[sec].loc[start_end_dates[1]]['close']
month_return = (close_price - open_price) / open_price
returns_per_sec[sec] = [month_return]
row = pd.DataFrame(returns_per_sec, index=[datetime(start_end_dates[0].year, start_end_dates[0].month, 1)])
monthly_returns = monthly_returns.append(row)
return monthly_returns
def update_ff(self, data):
"""
Updates the fama and french DataFrame with the latest data
Inputs:
- data
PythonData object containing the ff data
"""
row = pd.DataFrame({"Mkt" : [data.GetProperty('Mkt')],
"SMB" : [data.GetProperty('SMB')],
"HML" : [data.GetProperty('HML')],
"Rf" : [data.GetProperty('Rf')]},
index=[data.Time])
self.fama_french_factors = self.fama_french_factors.append(row).iloc[-36:].copy()
class ResidualMomentum:
"""
This class manages a rolling window of the previous 36 full months. It gathers
its data via a consolidator into monthly bars. Every month, it calculates a
score based on the residual returns over the previous 12 full months (excluding
the lastest month) (t-12 - t-2). The model is fit to the residual returns over
the previous 36 full months (t-36 - t-1).
"""
def __init__(self, symbol, monthly_returns, algorithm, alpha):
self.Symbol = symbol
self.monthly_returns = monthly_returns
self.monthly_returns.columns = ['m_return'] #str(symbol.ID)
self.score = None
self.algo = algorithm
self.alpha = alpha
self.first = True
# Setup monthly consolidation
self.consolidator = TradeBarConsolidator(self.CustomMonthly)
self.consolidator.DataConsolidated += self.CustomMonthlyHandler
algorithm.SubscriptionManager.AddConsolidator(symbol, self.consolidator)
def update_score(self):
"""
Updates the score for the Residual Momentum indicator. If the price is <$1,
we assign it a score of None and won't be considered for insights for the
time being. The score is determined by fitting a regression model to the
previous 36 full months (t-36 to t-1) using the fama french factors. Once
the model is fit, we test the regression model on the previous 12 months,
excluding the most recent month (t-12 to t-2).
"""
# If current price < $1, don't bother calculating the score
if self.algo.CurrentSlice[self.Symbol].Close < 1:
self.score = None
return
# Fit regression model over the previous 36 months
X = self.alpha.fama_french_factors
X = sm.add_constant(X)
y = self.monthly_returns.values
model = sm.OLS(y, X).fit()
# Calculate score on the previous 12 months, excluding the most recent month
# (t-12 - t-2)
pred = model.predict(X.iloc[-12:-1])
residual_returns = self.monthly_returns.iloc[-12:-1].values - pred.values
self.score = residual_returns.sum() / residual_returns.std()
def CustomMonthly(self, dt):
'''Custom Monthly Func'''
start = dt.replace(day=1).date()
end = dt.replace(day=28) + timedelta(4)
end = (end - timedelta(end.day-1)).date()
return CalendarInfo(start, end - start)
def CustomMonthlyHandler(self, sender, consolidated):
"""
Updates the monthly returns rolling window then the score.
Inputs
- sender
- consolidated
Tradebar representing the latest completed month
"""
# Append to monthly returns rolling window DataFrame
monthly_return = (consolidated.Close - consolidated.Open) / consolidated.Close
row = pd.DataFrame({'m_return' : [monthly_return]}, index=[consolidated.Time])
self.monthly_returns = self.monthly_returns.append(row).iloc[-36:].copy()
self.update_score()from ResidualMomentumAlpha import ResidualMomentumAlphaModel
class NadionParticleAntennaArray(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2017, 1, 1)
self.SetCash(100000)
# Add Fama French data
self.AddData(FF, "FF", Resolution.Daily)
self.SetWarmUp(timedelta(weeks=52*3))
# Refresh universe on the last trading day of the month
self.month = -1
self.coarse_size = 400
self.SetUniverseSelection(FineFundamentalUniverseSelectionModel(self.CoarseSelectionFunction,
self.FineSelectionFunction, None, None))
self.UniverseSettings.Resolution = Resolution.Daily
self.AddAlpha(ResidualMomentumAlphaModel())
self.SetPortfolioConstruction(EqualWeightingPortfolioConstructionModel())
self.SetExecution(ImmediateExecutionModel())
def OnData(self, data):
# New fama french data
if data.ContainsKey("FF"):
self.Alpha.Update(self, data)
def CoarseSelectionFunction(self, coarse):
if self.month == self.Time.month:
return Universe.Unchanged
return [ x.Symbol for x in coarse if x.HasFundamentalData ][:self.coarse_size]
def FineSelectionFunction(self, fine):
self.month = self.Time.month
# Select the top 10%, based on market cap
sorted_mkt_cap = sorted(fine, key=lambda x: x.MarketCap, reverse=True)
universe_size = len(sorted_mkt_cap) // 10 # top 10%
return [ x.Symbol for x in sorted_mkt_cap[:universe_size] ]
class FF(PythonData):
def GetSource(self, config, date, isLiveMode):
source = "https://github.com/QuantConnect/Tutorials/raw/feature-data-directory/Data/F-F_Research_Data_Factors.csv"
return SubscriptionDataSource(source, SubscriptionTransportMedium.RemoteFile)
def Reader(self, config, line, date, isLive):
# If first character is not digit, pass
if not (line.strip() and line[0].isdigit()): return None
try:
data = line.split(',')
ff = FF()
ff.Symbol = config.Symbol
ff.Time = datetime.strptime(data[0], '%Y%m')
# tnx.Value = float(data[4])
ff.SetProperty("Mkt", float(data[1]))
ff.SetProperty("SMB", float(data[2]))
ff.SetProperty("HML", float(data[3]))
ff.SetProperty("Rf", float(data[4]))
return ff
except ValueError:
# Do nothing, possible error in json decoding
return None