| Overall Statistics |
|
Total Trades 37 Average Win 0.22% Average Loss -0.67% Compounding Annual Return -3.291% Drawdown 3.800% Expectancy -0.084 Net Profit -0.821% Sharpe Ratio -0.437 Probabilistic Sharpe Ratio 19.357% Loss Rate 31% Win Rate 69% Profit-Loss Ratio 0.33 Alpha -0.015 Beta 0.066 Annual Standard Deviation 0.05 Annual Variance 0.003 Information Ratio 0.512 Tracking Error 0.174 Treynor Ratio -0.333 Total Fees $42.38 Estimated Strategy Capacity $19000000.00 Lowest Capacity Asset VFH SVS2QA8SPHET |
#region imports
from AlgorithmImports import *
import numpy as np
import pandas as pd
import statsmodels.api as sm
from sklearn.linear_model import LinearRegression
from sklearn.linear_model import Ridge
#endregion
sector_dict = {
"BasicMaterials":MorningstarSectorCode.BasicMaterials,
"ConsumerCyclical":MorningstarSectorCode.ConsumerCyclical,
"FinancialServices":MorningstarSectorCode.FinancialServices,
"RealEstate":MorningstarSectorCode.RealEstate,
"ConsumerDefensive":MorningstarSectorCode.ConsumerDefensive,
"Healthcare":MorningstarSectorCode.Healthcare,
"Utilities":MorningstarSectorCode.Utilities,
"CommunicationServices":MorningstarSectorCode.CommunicationServices,
"Energy":MorningstarSectorCode.Energy,
"Industrials":MorningstarSectorCode.Industrials,
"Technology":MorningstarSectorCode.Technology
}
# Refer to https://www.sectorspdr.com/
sector_etf = {
"BasicMaterials":'XLB',
"ConsumerCyclical":'XLY',
"FinancialServices":'XLF',
"RealEstate":'XLRE',
"ConsumerDefensive":'XLP',
"Healthcare":'XLV',
"Utilities":'XLU',
"CommunicationServices":'XLC',
"Energy":'XLE',
"Industrials":'XLI',
"Technology":'XLK'
}
# Your New Python File
class SingleFactorAlgorithm(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2010, 1, 1) # Set Start Date
self.SetEndDate(2023, 2, 28) # Set End Date
self.SetCash(100000) # Set Starting Cash Balance
self.UniverseSettings.Leverage = 2 # Tunable parameter: Available leverage
# Rebalancing not yet implemented
# Will prefer shorter rebalancing period, e.g. weekly
self.rebalance_days = 5 # Tunable parameter: Rebalancing frequency in days
self.nextRebalance = self.Time # Set the next rebalancing time
self.lookback = 61 # Tunable parameter: Historical data length in days
self.num_equities = 30 # Tunable parameter: Equities universe, proxy for SPX500
self.UniverseUpdateFrequency = 90 # Tunable parameter: Universe update frequency in days
self.nextUpdate = self.Time
self.UniverseSettings.Resolution = Resolution.Daily # Daily resolution data
# Select the sector
self.sector = "FinancialServices" # Tunable parameter: Sector used for backtesting
self.sectorCode = self.AddEquity(sector_etf[self.sector], Resolution.Daily).Symbol
# Add pair(s) to regress on
# self.AddEquity("SPY", Resolution.Daily)
self.AddUniverse(self.CoarseFilterFunction, self.FineFilterFunction)
def CoarseFilterFunction(self, coarse: List[CoarseFundamental]):
# Conduct upgrade only in certain interval
if self.Time < self.nextUpdate:
return [c.Symbol for c in coarse]
# First filter: Return US equities that has fundamental data
return [x.Symbol for x in coarse if x.HasFundamentalData]
def FineFilterFunction(self, fine):
# Filter for the industry
if self.Time < self.nextUpdate:
return [f.Symbol for f in fine]
# Second filter: Return top X [num_equities] US equities in sector, by MarketCap
selected = sorted([x for x in fine if x.AssetClassification.MorningstarSectorCode == sector_dict[self.sector]],
key=lambda x: x.MarketCap, reverse=True)
symbols = [x.Symbol for x in selected[:self.num_equities]]
# Update the next filter time // second testing point
# self.nextUpdate = self.Time + timedelta(days=self.UniverseUpdateFrequency)
return symbols
def OnData(self, data):
# Do nothing until next rebalance
if self.Time < self.nextRebalance:
return
# Get historical data of the selected symbols
history = self.History(list(self.ActiveSecurities.Keys), self.lookback, Resolution.Daily).close.unstack(level=0)
# Get the weights
s_scores, betas = self.GetScores(history)
weights = self.GetWeights(s_scores, betas, data)
for symbol, weight in weights.items():
self.SetHoldings(symbol, weight, tag = str(symbol.Value)+": "+str(weight))
# Set next rebalancing date
self.nextRebalance = self.Time + timedelta(days=self.rebalance_days)
def GetScores(self, history):
# Get the weights according to OU regression models
returns = self.GetNormalizedReturns(history)
# To store the ou params, s scores and betas for mean reversion trades
betas = pd.DataFrame(index=returns.drop(self.sectorCode, axis=1).columns,
columns=[self.sectorCode])
ou_parameters = pd.DataFrame(index=returns.drop(self.sectorCode, axis=1).columns,
columns=['a', 'b', 'Var(zeta)', 'kappa', 'm', 'sigma', 'sigma_eq'])
for stock in returns.drop(self.sectorCode, axis=1).columns:
X = returns[[self.sectorCode]].values
y = returns[stock].values
# First regression to filter out beta
model1 = LinearRegression().fit(X,y)
betas.loc[self.Symbol(stock)] = model1.coef_
epsilon = y - model1.predict(X)
# Second regression for predicting beta
cum_epsilon = epsilon.cumsum()
X = cum_epsilon[:-1].reshape(-1,1)
y = cum_epsilon[1:]
model2 = LinearRegression().fit(X,y)
a = model2.intercept_
b = model2.coef_
zeta = y - model2.predict(X)
# OU parameters
kappa = -np.log(b)*252
m = a/(1-b)
sigma = np.sqrt(np.var(zeta)*2*kappa/(1-b**2))
sigma_eq = np.sqrt(np.var(zeta)/(1-b**2))
# if the speed of mean reversion is high enough, save the calculated parameters
# Tunable parameter: kappa threshold - 252/30 for 1.5 months mean reversion,
if kappa > 252/30:
ou_parameters.loc[stock] = [x.item() for x in [a,b,np.var(zeta),kappa,m,sigma,sigma_eq]]
ou_parameters.dropna(axis=0, inplace=True)
# calculate s-score
ou_parameters['m_bar'] = (ou_parameters['a']/(1 - ou_parameters['b']) -
ou_parameters['a'].mean()/(1-ou_parameters['b'].mean()))
ou_parameters['s'] = -ou_parameters['m_bar'] / ou_parameters['sigma_eq']
s_scores = ou_parameters['s'].to_dict()
# Trivial return
return s_scores, betas
def GetNormalizedReturns(self, df):
temp = df.dropna(axis=1).pct_change().dropna()
# Minor formatting for parsing symbols
temp.columns = [self.Symbol(x) for x in temp.columns]
temp_mean = temp.mean()
temp_std = temp.std()
return ((temp-temp_mean)/(temp_std))
def GetWeights(self, s_scores, betas, data):
# Figure out which stocks to long/short/liquidiate positions
# Tunable parameter: s_score threshold to long/short/liquidate
position = {}
for sym, s in s_scores.items():
if s > 1.25: # Long
position[sym] = 1
elif s < -1.25: # Short
position[sym] = -1
elif s < 0.75 and self.Portfolio[sym].Invested: # Liquidate Long
position[sym] = 0
elif s > -0.75 and self.Portfolio[sym].Invested: # Liquidate Short
position[sym] = 0
else: # Otherwise no update needed, retain current position
curr_quant = self.Portfolio[sym].Quantity
if curr_quant > 0:
position[sym] = 1
elif curr_quant < 0:
position[sym] = -1
else:
position[sym] = 0
# Allocate equal weight among long and short position
position_weights = dict()
# Find total# of position
long_sum = sum([position[x] for x in position if position[x] > 0])
short_sum = sum([position[x] for x in position if position[x] < 0])
# Tunable parameters: How to assign weight based on s_score threshold
for sym, weight in position.items():
formal_symbol = self.Symbol(str(sym))
if weight > 0:
position_weights[formal_symbol] = weight / (abs(long_sum)+abs(short_sum))
elif weight < 0:
position_weights[formal_symbol] = -weight / (abs(long_sum)+abs(short_sum))
else:
position_weights[formal_symbol] = 0
# Calculate hedging beta
hedging_weights = {self.sectorCode:0}
for sym, pos_w in position_weights.items():
hedging_weights[self.sectorCode] += (-betas.loc[sym, self.sectorCode] * pos_w)
return {**position_weights,**hedging_weights}
def OnSecuritiesChanged(self, changes):
# Liquidate when the symbols got removed from the universe
for security in changes.RemovedSecurities:
if security.Invested:
self.Liquidate(security.Symbol, 'Removed from Universe')#region imports
from AlgorithmImports import *
import numpy as np
import pandas as pd
import statsmodels.api as sm
from sklearn.linear_model import LinearRegression
from sklearn.linear_model import Ridge
#endregion
# Your New Python File
class PCAAlgorithm(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2022, 1, 1) # Set Start Date
self.SetEndDate(2022, 3, 31) # Set End Date
self.SetCash(100000) # Set Starting Cash Balance
self.UniverseSettings.Leverage = 2 # Tunable parameter: Available leverage
# Rebalancing not yet implemented
# Will prefer shorter rebalancing period, e.g. weekly
self.rebalance_days = 5 # Tunable parameter: Rebalancing frequency in days
self.nextRebalance = self.Time # Set the next rebalancing time
self.lookback = 61 # Tunable parameter: Historical data length in days, +1 to get percentage return
self.num_equities = 100 # Tunable parameter: Equities universe, proxy for SPX500
self.UniverseUpdateFrequency = 90 # Tunable parameter: Universe update frequency in days
self.nextUpdate = self.Time
self.UniverseSettings.Resolution = Resolution.Daily # Daily resolution data
self.AddUniverse(self.CoarseFilterFunction, self.FineFilterFunction)
self.num_pc = 2 # Tunable parameter: Number of principal components used
def CoarseFilterFunction(self, coarse: List[CoarseFundamental]):
# Conduct upgrade only in certain interval
if self.Time < self.nextUpdate:
return Universe.Unchanged
# First filter: Return US equities that has fundamental data
return [x.Symbol for x in coarse if x.HasFundamentalData]
def FineFilterFunction(self, fine):
# Filter for the industry
if self.Time < self.nextUpdate:
return Universe.Unchanged
selected = sorted([x for x in fine if((
self.Time - x.SecurityReference.IPODate).days > self.lookback * 2)],
key=lambda x:x.MarketCap, reverse=True)
symbols = [x.Symbol for x in selected[:self.num_equities]]
self.nextUpdate = self.Time + timedelta(days=self.UniverseUpdateFrequency)
return symbols
def OnData(self, data):
# Do nothing until next rebalance
if self.Time < self.nextRebalance:
return
# Get historical data of the selected symbols
history = self.History(list(self.ActiveSecurities.Keys), self.lookback, Resolution.Daily).close.unstack(level=0)
# Get the weights
s_scores, betas, EigenPortfolio = self.GetScores(history)
weights = self.GetWeights(s_scores, betas, EigenPortfolio, data)
for symbol, weight in weights.items():
self.SetHoldings(symbol, weight, tag = str(symbol.Value)+": "+str(weight))
# Set next rebalancing date
self.nextRebalance = self.Time + timedelta(days=self.rebalance_days)
def GetScores(self, history):
# Get the weights according to OU regression models
returns = self.GetNormalizedReturns(history)
evals, evecs = np.linalg.eigh(returns.corr())
# Avoid complex eigenvalues and eigenvectors
evals = np.real(evals)
evecs = np.real(evecs)
index = np.argsort(evals)[::-1]
EigenPortfolio_returns = pd.DataFrame(index=returns.index)
EigenPortfolio = pd.DataFrame(index=returns.columns)
for i in range(len(evals)):
EigenPortfolio_returns[f'eig{i}'] = 0
weight = evecs[:,index[i]]
weight /= abs(weight).sum() # Total abs(pos) sum to 1
EigenPortfolio[f'eig{i}'] = weight
for t in returns.index:
EigenPortfolio_returns.loc[t, f'eig{i}'] = (weight * returns.loc[t]).sum()
betas = pd.DataFrame(index=returns.columns,
columns=EigenPortfolio_returns[[f'eig{i}' for i in range(self.num_pc)]].columns)
ou_parameters = pd.DataFrame(index=returns.columns,
columns=['a', 'b', 'Var(zeta)', 'kappa', 'm', 'sigma', 'sigma_eq'])
for stock in returns.columns:
X = EigenPortfolio_returns[[f'eig{i}' for i in range(self.num_pc)]].values
y = returns[stock].values
# First regression to filter out beta
if(self.num_pc > 1):
model1 = Ridge().fit(X,y)
else:
model1 = LinearRegression().fit(X,y)
# model1 = LinearRegression().fit(X,y)
betas.loc[stock] = model1.coef_
epsilon = y - model1.predict(X)
# Second regression for predicting beta
cum_epsilon = epsilon.cumsum()
X = cum_epsilon[:-1].reshape(-1,1)
y = cum_epsilon[1:]
model2 = LinearRegression().fit(X,y)
a = model2.intercept_
b = model2.coef_
zeta = y - model2.predict(X)
# OU parameters
kappa = -np.log(b)*252
m = a/(1-b)
sigma = np.sqrt(np.var(zeta)*2*kappa/(1-b**2))
sigma_eq = np.sqrt(np.var(zeta)/(1-b**2))
# if the speed of mean reversion is high enough, save the calculated parameters
# 252/30 for 1.5 months mean reversion, can further tune this param
if kappa > 252/30:
ou_parameters.loc[stock] = [x.item() for x in [a,b,np.var(zeta),kappa,m,sigma,sigma_eq]]
ou_parameters.dropna(axis=0, inplace=True)
# calculate s-score
ou_parameters['m_bar'] = (ou_parameters['a']/(1 - ou_parameters['b']) -
ou_parameters['a'].mean()/(1-ou_parameters['b'].mean()))
ou_parameters['s'] = -ou_parameters['m_bar'] / ou_parameters['sigma_eq']
s_scores = ou_parameters['s'].to_dict()
# Return return the first n eigenportfolios needed
EigenPortfolio_result = EigenPortfolio[EigenPortfolio.columns[:self.num_pc]]
return s_scores, betas, EigenPortfolio_result
def GetNormalizedReturns(self, df):
temp_na = df.loc[:, df.isna().any()]
temp = df.dropna(axis=1).pct_change().dropna()
# Minor formatting for parsing symbols
temp.columns = [self.Symbol(x) for x in temp.columns]
temp_mean = temp.mean()
temp_std = temp.std()
return ((temp-temp_mean)/(temp_std))
def GetWeights(self, s_scores, betas, EigenPortfolio, data):
# Figure out which stocks to long/short/liquidiate positions
# Tunable parameter: s_score threshold to long/short/close
position = dict()
for sym in self.ActiveSecurities.Keys:
# in s_scores: kappa test passed
if sym in s_scores:
s = s_scores[sym]
# Check if position closing needed
if self.Portfolio[sym].Invested:
if s < 0.75: # Tunable parameter: Close Long
position[sym] = 0
elif s > -0.75: # Tunable parameter: Close Short
position[sym] = 0
elif self.Portfolio[sym].Quantity > 0: # Remain Long
position[sym] = 1
elif self.Portfolio[sym].Quantity < 0: # Remain Short
position[sym] = -1
else:
position[sym] = 0
else:
if s > 1.25: # Tunable parameter: Open Long
position[sym] = 1
elif s < -1.25: # Tunable parameter: Open Short
position[sym] = -1
else:
position[sym] = 0
# Not in s_scores - kappa test not passed
else:
position[sym] = 0
# Allocate equal weight among long and short position
position_weights = dict()
# Find total# of position
long_sum = sum([position[x] for x in position if position[x] > 0])
short_sum = sum([position[x] for x in position if position[x] < 0])
# Calculate total exposure inc beta hedging
total_exposure = 0
for sym, weight in position.items():
formal_symbol = self.Symbol(str(sym))
if weight > 0 or weight < 0:
exp = abs(weight)
for code in EigenPortfolio.columns:
exp += abs(float(betas.loc[sym, code]))
total_exposure += exp
# Tunable parameters: How to assign weight based on s_score threshold
for sym, weight in position.items():
formal_symbol = self.Symbol(str(sym))
if weight > 0:
position_weights[formal_symbol] = 1 / total_exposure
elif weight < 0:
position_weights[formal_symbol] = -1 / total_exposure
else:
position_weights[formal_symbol] = 0
# Calculate hedging beta
hedging_weights = dict()
for stock in EigenPortfolio.index:
hedging_weights[stock] = 0
for sym, pos_w in position_weights.items():
for code in EigenPortfolio.columns:
port_weight = -betas.loc[sym, code] * pos_w
for stock in EigenPortfolio.index:
hedging_weights[stock] += EigenPortfolio.loc[stock, code] * port_weight
combined = {i: position_weights.get(i, 0) + hedging_weights.get(i, 0)
for i in set(position_weights).union(hedging_weights)}
return combined
def OnSecuritiesChanged(self, changes):
# Liquidate when the symbols got removed from the universe
for security in changes.RemovedSecurities:
if security.Invested:
self.Liquidate(security.Symbol, 'Removed from Universe')#region imports
from AlgorithmImports import *
import numpy as np
import pandas as pd
import statsmodels.api as sm
from sklearn.linear_model import LinearRegression
from sklearn.linear_model import Ridge
#endregion
sector_dict = {
"BasicMaterials":MorningstarSectorCode.BasicMaterials,
"ConsumerCyclical":MorningstarSectorCode.ConsumerCyclical,
"FinancialServices":MorningstarSectorCode.FinancialServices,
"RealEstate":MorningstarSectorCode.RealEstate,
"ConsumerDefensive":MorningstarSectorCode.ConsumerDefensive,
"Healthcare":MorningstarSectorCode.Healthcare,
"Utilities":MorningstarSectorCode.Utilities,
"CommunicationServices":MorningstarSectorCode.CommunicationServices,
"Energy":MorningstarSectorCode.Energy,
"Industrials":MorningstarSectorCode.Industrials,
"Technology":MorningstarSectorCode.Technology
}
# Tunable parameter: Symbol used to get systematic factor
# Refer to https://www.sectorspdr.com/
sector_etfs = {
"BasicMaterials":['XLB'],
"ConsumerCyclical":['XLY'],
"FinancialServices":['XLF', 'VFH'],
"RealEstate":['XLRE'],
"ConsumerDefensive":['XLP'],
"Healthcare":['XLV'],
"Utilities":['XLU'],
"CommunicationServices":['XLC'],
"Energy":['XLE'],
"Industrials":['XLI'],
"Technology":['XLK']
}
# Your New Python File
class MultiFactorAlgorithm(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2010, 1, 1) # Set Start Date
self.SetEndDate(2010, 3, 31) # Set End Date
self.SetCash(100000) # Set Starting Cash Balance
self.UniverseSettings.Leverage = 2 # Tunable parameter: Available leverage
# Rebalancing not yet implemented
# Will prefer shorter rebalancing period, e.g. weekly
self.rebalance_days = 5 # Tunable parameter: Rebalancing frequency in days
self.nextRebalance = self.Time # Set the next rebalancing time
self.lookback = 61 # Tunable parameter: Historical data length in days
self.num_equities = 30 # Tunable parameter: Equities universe, proxy for SPX500
self.UniverseUpdateFrequency = 90 # Tunable parameter: Universe update frequency in days
self.nextUpdate = self.Time
self.UniverseSettings.Resolution = Resolution.Daily # Daily resolution data
# Select the sector
self.sector = "FinancialServices" # Tunable parameter: Sector used for backtesting
self.sectorCodes = list()
for etf in sector_etfs[self.sector]:
self.sectorCodes.append(self.AddEquity(etf, Resolution.Daily).Symbol)
# self.sectorCode = None #
# Add pair(s) to regress on
# self.AddEquity("SPY", Resolution.Daily)
self.AddUniverse(self.CoarseFilterFunction, self.FineFilterFunction)
def CoarseFilterFunction(self, coarse: List[CoarseFundamental]):
# Conduct upgrade only in certain interval
if self.Time < self.nextUpdate:
return [c.Symbol for c in coarse]
# First filter: Return US equities that has fundamental data
return [x.Symbol for x in coarse if x.HasFundamentalData]
def FineFilterFunction(self, fine):
# Filter for the industry
if self.Time < self.nextUpdate:
return [f.Symbol for f in fine]
# Second filter: Return top X [num_equities] US equities in sector, by MarketCap
selected = sorted([x for x in fine if x.AssetClassification.MorningstarSectorCode == sector_dict[self.sector]],
key=lambda x: x.MarketCap, reverse=True)
symbols = [x.Symbol for x in selected[:self.num_equities]]
# Update the next filter time // second testing point
# self.nextUpdate = self.Time + timedelta(days=self.UniverseUpdateFrequency)
return symbols
def OnData(self, data):
# Do nothing until next rebalance
if self.Time < self.nextRebalance:
return
# Get historical data of the selected symbols
history = self.History(list(self.ActiveSecurities.Keys), self.lookback, Resolution.Daily).close.unstack(level=0)
# Get the weights
s_scores, betas = self.GetScores(history)
weights = self.GetWeights(s_scores, betas, data)
for symbol, weight in weights.items():
self.SetHoldings(symbol, weight, tag = str(symbol.Value)+": "+str(weight))
# Set next rebalancing date
self.nextRebalance = self.Time + timedelta(days=self.rebalance_days)
def GetScores(self, history):
# Get the weights according to OU regression models
returns = self.GetNormalizedReturns(history)
# To store the ou params, s scores and betas for mean reversion trades
betas = pd.DataFrame(index=returns.drop(self.sectorCodes, axis=1).columns,
columns=[self.sectorCodes])
ou_parameters = pd.DataFrame(index=returns.drop(self.sectorCodes, axis=1).columns,
columns=['a', 'b', 'Var(zeta)', 'kappa', 'm', 'sigma', 'sigma_eq'])
for stock in returns.drop(self.sectorCodes, axis=1).columns:
X = returns[self.sectorCodes].values
y = returns[stock].values
# First regression to filter out beta
# Tunable parameter: Use ridge regression, additional regularization term
# For datasets with highly correlated dependent variables
if(len(self.sectorCodes) > 1):
model1 = Ridge().fit(X,y)
else:
model1 = LinearRegression().fit(X,y)
betas.loc[self.Symbol(stock)] = model1.coef_
epsilon = y - model1.predict(X)
# Second regression for predicting beta
cum_epsilon = epsilon.cumsum()
X = cum_epsilon[:-1].reshape(-1,1)
y = cum_epsilon[1:]
model2 = LinearRegression().fit(X,y)
a = model2.intercept_
b = model2.coef_
zeta = y - model2.predict(X)
# OU parameters
kappa = -np.log(b)*252
m = a/(1-b)
sigma = np.sqrt(np.var(zeta)*2*kappa/(1-b**2))
sigma_eq = np.sqrt(np.var(zeta)/(1-b**2))
# if the speed of mean reversion is high enough, save the calculated parameters
# Tunable parameter: kappa threshold - 252/30 for 1.5 months mean reversion,
if kappa > 252/30:
ou_parameters.loc[stock] = [x.item() for x in [a,b,np.var(zeta),kappa,m,sigma,sigma_eq]]
ou_parameters.dropna(axis=0, inplace=True)
# calculate s-score
ou_parameters['m_bar'] = (ou_parameters['a']/(1 - ou_parameters['b']) -
ou_parameters['a'].mean()/(1-ou_parameters['b'].mean()))
ou_parameters['s'] = -ou_parameters['m_bar'] / ou_parameters['sigma_eq']
s_scores = ou_parameters['s'].to_dict()
# Trivial return
return s_scores, betas
def GetNormalizedReturns(self, df):
temp = df.dropna(axis=1).pct_change().dropna()
# Minor formatting for parsing symbols
temp.columns = [self.Symbol(x) for x in temp.columns]
temp_mean = temp.mean()
temp_std = temp.std()
return ((temp-temp_mean)/(temp_std))
def GetWeights(self, s_scores, betas, data):
# Figure out which stocks to long/short/liquidiate positions
# Tunable parameter: s_score threshold to long/short/close
position = dict()
for sym in self.ActiveSecurities.Keys:
# Pass if it is in sectorCodes - as benchmark etfs
if sym in self.sectorCodes:
continue
# in s_scores: kappa test passed
if sym in s_scores:
s = s_scores[sym]
# Check if position closing needed
if self.Portfolio[sym].Invested:
if s < 0.75: # Tunable parameter: Close Long
position[sym] = 0
elif s > -0.75: # Tunable parameter: Close Short
position[sym] = 0
elif self.Portfolio[sym].Quantity > 0: # Remain Long
position[sym] = 1
elif self.Portfolio[sym].Quantity < 0: # Remain Short
position[sym] = -1
else:
position[sym] = 0
else:
if s > 1.25: # Tunable parameter: Open Long
position[sym] = 1
elif s < -1.25: # Tunable parameter: Open Short
position[sym] = -1
else:
position[sym] = 0
# Not in s_scores - kappa test not passed
else:
position[sym] = 0
# for sym, s in s_scores.items():
# if s > 1.25: # Long
# position[sym] = 1
# elif s < -1.25: # Short
# position[sym] = -1
# elif s < 0.75 and self.Portfolio[sym].Invested: # Liquidate Long
# position[sym] = 0
# elif s > -0.75 and self.Portfolio[sym].Invested: # Liquidate Short
# position[sym] = 0
# else: # Otherwise no update needed, retain current position
# curr_quant = self.Portfolio[sym].Quantity
# if curr_quant > 0:
# position[sym] = 1
# elif curr_quant < 0:
# position[sym] = -1
# else:
# position[sym] = 0
# Allocate equal weight among long and short position
position_weights = dict()
# Find total# of position
long_sum = sum([position[x] for x in position if position[x] > 0])
short_sum = sum([position[x] for x in position if position[x] < 0])
# Tunable parameters: How to assign weight based on s_score threshold
for sym, weight in position.items():
formal_symbol = self.Symbol(str(sym))
if weight > 0:
position_weights[formal_symbol] = weight / (abs(long_sum)+abs(short_sum))
elif weight < 0:
position_weights[formal_symbol] = weight / (abs(long_sum)+abs(short_sum))
else:
position_weights[formal_symbol] = 0
# Calculate hedging beta
hedging_weights = dict()
for code in self.sectorCodes:
hedging_weights[code] = 0
for sym, pos_w in position_weights.items():
for code in hedging_weights:
hedging_weights[code] += (-betas.loc[sym, code] * pos_w)
# hedging_weights = {self.sectorCode:0}
# for sym, pos_w in position_weights.items():
# hedging_weights[self.sectorCode] += (-betas.loc[sym, self.sectorCode] * pos_w)
return {**position_weights,**hedging_weights}
def OnSecuritiesChanged(self, changes):
# Liquidate when the symbols got removed from the universe
for security in changes.RemovedSecurities:
if security.Invested:
self.Liquidate(security.Symbol, 'Removed from Universe')#region imports
from AlgorithmImports import *
import numpy as np
import pandas as pd
import statsmodels.api as sm
from sklearn.linear_model import LinearRegression
from sklearn.linear_model import Ridge
#endregion
sector_dict = {
"BasicMaterials":MorningstarSectorCode.BasicMaterials,
"ConsumerCyclical":MorningstarSectorCode.ConsumerCyclical,
"FinancialServices":MorningstarSectorCode.FinancialServices,
"RealEstate":MorningstarSectorCode.RealEstate,
"ConsumerDefensive":MorningstarSectorCode.ConsumerDefensive,
"Healthcare":MorningstarSectorCode.Healthcare,
"Utilities":MorningstarSectorCode.Utilities,
"CommunicationServices":MorningstarSectorCode.CommunicationServices,
"Energy":MorningstarSectorCode.Energy,
"Industrials":MorningstarSectorCode.Industrials,
"Technology":MorningstarSectorCode.Technology
}
# Tunable parameter: Symbol used to get systematic factor
# Refer to https://www.sectorspdr.com/
sector_etfs = {
"BasicMaterials":['XLB'],
"ConsumerCyclical":['XLY'],
"FinancialServices":['XLF', 'VFH', 'SPY'],
"RealEstate":['XLRE'],
"ConsumerDefensive":['XLP'],
"Healthcare":['XLV'],
"Utilities":['XLU'],
"CommunicationServices":['XLC'],
"Energy":['XLE'],
"Industrials":['XLI'],
"Technology":['XLK']
}
# Your New Python File
class MultiFactorBetaHedgeAlgorithm(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2022, 1, 1) # Set Start Date
self.SetEndDate(2022, 3, 31) # Set End Date
self.SetCash(100000) # Set Starting Cash Balance
self.UniverseSettings.Leverage = 2 # Tunable parameter: Available leverage
# Rebalancing not yet implemented
# Will prefer shorter rebalancing period, e.g. weekly
self.rebalance_days = 30 # Tunable parameter: Rebalancing frequency in days
self.nextRebalance = self.Time # Set the next rebalancing time
self.lookback = 61 # Tunable parameter: Historical data length in days, +1 to get percentage return
self.num_equities = 30 # Tunable parameter: Equities universe, proxy for SPX500
self.UniverseUpdateFrequency = 90 # Tunable parameter: Universe update frequency in days
self.nextUpdate = self.Time
self.UniverseSettings.Resolution = Resolution.Daily # Daily resolution data
# Select the sector
self.sector = "FinancialServices" # Tunable parameter: Sector used for backtesting
self.sectorCodes = list()
for etf in sector_etfs[self.sector]:
self.sectorCodes.append(self.AddEquity(etf, Resolution.Daily).Symbol)
# Add pair(s) to regress on
# self.AddEquity("SPY", Resolution.Daily)
self.AddUniverse(self.CoarseFilterFunction, self.FineFilterFunction)
def CoarseFilterFunction(self, coarse: List[CoarseFundamental]):
# Conduct upgrade only in certain interval
if self.Time < self.nextUpdate:
return [c.Symbol for c in coarse]
# First filter: Return US equities that has fundamental data
return [x.Symbol for x in coarse if x.HasFundamentalData]
def FineFilterFunction(self, fine):
# Filter for the industry
if self.Time < self.nextUpdate:
return [f.Symbol for f in fine]
# Top X [num_equities] US equities in sector, by MarketCap
# Only include those with IPODate at least lookback * 2 times before current tick
# This is to avoid bugs in backtesting historical data
selected = sorted([x for x in fine if(
x.AssetClassification.MorningstarSectorCode == sector_dict[self.sector]
and (self.Time - x.SecurityReference.IPODate).days > self.lookback * 2)],
key=lambda x: x.MarketCap, reverse=True)
symbols = [x.Symbol for x in selected[:self.num_equities]]
# Update the next filter time
# self.nextUpdate = self.Time + timedelta(days=self.UniverseUpdateFrequency)
return symbols
def OnData(self, data):
# Do nothing until next rebalance
if self.Time < self.nextRebalance:
return
# Get historical data of the selected symbols
history = self.History(list(self.ActiveSecurities.Keys), self.lookback, Resolution.Daily).close.unstack(level=0)
# Get the weights
s_scores, betas = self.GetScores(history)
weights = self.GetWeights(s_scores, betas, data)
for symbol, weight in weights.items():
self.SetHoldings(symbol, weight, tag = str(symbol.Value)+": "+str(weight))
# Set next rebalancing date
self.nextRebalance = self.Time + timedelta(days=self.rebalance_days)
def GetScores(self, history):
# Get the weights according to OU regression models
returns = self.GetNormalizedReturns(history)
# To store the ou params, s scores and betas for mean reversion trades
betas = pd.DataFrame(index=returns.drop(self.sectorCodes, axis=1).columns,
columns=[self.sectorCodes])
ou_parameters = pd.DataFrame(index=returns.drop(self.sectorCodes, axis=1).columns,
columns=['a', 'b', 'Var(zeta)', 'kappa', 'm', 'sigma', 'sigma_eq'])
for stock in returns.drop(self.sectorCodes, axis=1).columns:
X = returns[self.sectorCodes].values
y = returns[stock].values
# First regression to filter out beta
# Tunable parameter: Use ridge regression, additional regularization term
# For datasets with highly correlated dependent variables
if(len(self.sectorCodes) > 1):
model1 = Ridge().fit(X,y)
else:
model1 = LinearRegression().fit(X,y)
betas.loc[self.Symbol(stock)] = model1.coef_
epsilon = y - model1.predict(X)
# Second regression for predicting beta
cum_epsilon = epsilon.cumsum()
X = cum_epsilon[:-1].reshape(-1,1)
y = cum_epsilon[1:]
model2 = LinearRegression().fit(X,y)
a = model2.intercept_
b = model2.coef_
zeta = y - model2.predict(X)
# OU parameters
kappa = -np.log(b)*252
m = a/(1-b)
sigma = np.sqrt(np.var(zeta)*2*kappa/(1-b**2))
sigma_eq = np.sqrt(np.var(zeta)/(1-b**2))
# if the speed of mean reversion is high enough, save the calculated parameters
# Tunable parameter: kappa threshold - 252/30 for 1.5 months mean reversion,
if kappa > 252/30:
ou_parameters.loc[stock] = [x.item() for x in [a,b,np.var(zeta),kappa,m,sigma,sigma_eq]]
ou_parameters.dropna(axis=0, inplace=True)
# calculate s-score
ou_parameters['m_bar'] = (ou_parameters['a']/(1 - ou_parameters['b']) -
ou_parameters['a'].mean()/(1-ou_parameters['b'].mean()))
ou_parameters['s'] = -ou_parameters['m_bar'] / ou_parameters['sigma_eq']
s_scores = ou_parameters['s'].to_dict()
return s_scores, betas
def GetNormalizedReturns(self, df):
temp = df.dropna(axis=1).pct_change().dropna()
# Minor formatting for parsing symbols
temp.columns = [self.Symbol(x) for x in temp.columns]
temp_mean = temp.mean()
temp_std = temp.std()
return ((temp-temp_mean)/(temp_std))
def GetWeights(self, s_scores, betas, data):
# Figure out which stocks to long/short/liquidiate positions
# Tunable parameter: s_score threshold to long/short/close
position = dict()
for sym in self.ActiveSecurities.Keys:
# Pass if it is in sectorCodes - as benchmark etfs
if sym in self.sectorCodes:
continue
# in s_scores: kappa test passed
if sym in s_scores:
s = s_scores[sym]
# Check if position closing needed
if self.Portfolio[sym].Invested:
if s < 0.5: # Tunable parameter: Close Long
position[sym] = 0
elif s > -0.5: # Tunable parameter: Close Short
position[sym] = 0
elif self.Portfolio[sym].Quantity > 0: # Remain Long
position[sym] = 1
elif self.Portfolio[sym].Quantity < 0: # Remain Short
position[sym] = -1
else:
position[sym] = 0
else:
if s > 1.25: # Tunable parameter: Open Long
position[sym] = 1
elif s < -1.25: # Tunable parameter: Open Short
position[sym] = -1
else:
position[sym] = 0
# Not in s_scores - kappa test not passed
else:
position[sym] = 0
# Allocate equal weight among long and short position
position_weights = dict()
# Find total# of position
long_sum = sum([position[x] for x in position if position[x] > 0])
short_sum = sum([position[x] for x in position if position[x] < 0])
# Calculate total exposure inc beta hedging
total_exposure = 0
for sym, weight in position.items():
formal_symbol = self.Symbol(str(sym))
if weight > 0 or weight < 0:
exp = abs(weight)
for code in self.sectorCodes:
exp += abs(float(betas.loc[sym, code]))
total_exposure += exp
# average_exposure = total_exposure / (abs(long_sum)+abs(short_sum))
# Tunable parameters: How to assign weight based on s_score threshold
for sym, weight in position.items():
formal_symbol = self.Symbol(str(sym))
if weight > 0:
position_weights[formal_symbol] = 1 / total_exposure
elif weight < 0:
position_weights[formal_symbol] = -1 / total_exposure
else:
position_weights[formal_symbol] = 0
# Calculate hedging beta
hedging_weights = dict()
for code in self.sectorCodes:
hedging_weights[code] = 0
for sym, pos_w in position_weights.items():
for code in hedging_weights:
hedging_weights[code] += (-betas.loc[sym, code] * pos_w)
return {**position_weights,**hedging_weights}
def OnSecuritiesChanged(self, changes):
# Liquidate when the symbols got removed from the universe
for security in changes.RemovedSecurities:
if security.Invested:
self.Liquidate(security.Symbol, 'Removed from Universe')#region imports
from AlgorithmImports import *
#endregion
import numpy as np
import pandas as pd
import statsmodels.api as sm
from sklearn.decomposition import PCA
class PcaStatArbitrageAlgorithm(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2011, 1, 1) # Set Start Date
self.SetEndDate(2011, 5, 1) # Set End Date
self.SetCash(100000) # Set Starting Cash Balance
self.nextRebalance = self.Time # Set the next rebalancing time
# Will prefer shorter rebalancing period, e.g. weekly
self.rebalance_days = 0 # Set rebalancing frequency in days
self.lookback = 61 # Historical data length in days
self.num_components = 15 # Number of principal components in PCA
self.num_equities = 500 # Equities universe, proxy for SPX500
self.weights_buy = pd.DataFrame() # Dataframe to buy with symbol as index
self.weights_sell = pd.DataFrame() # Dataframe to sell with symbol as index
self.weights_liquidate = pd.DataFrame() # Dataframe to switch with symbol as index
self.UniverseSettings.Resolution = Resolution.Daily # Use hour resolution for speed
self.SetBenchmark("SPX") # Benchmark
self.AddUniverse(self.CoarseSelectionAndPCA) # Coarse selection + PCA
#self.AddRiskManagement(MaximumDrawdownPercentPerSecurity(0.03))
def CoarseSelectionAndPCA(self, coarse):
'''Drop securities which have too low prices.
Select those with highest by dollar volume.
Finally do PCA and get the selected trading symbols.
'''
# Before next rebalance time, just remain the current universe
if self.Time < self.nextRebalance:
return Universe.Unchanged
### Simple coarse selection first
# Sort the equities in DollarVolume decendingly
selected = sorted([x for x in coarse if x.Price > 5],
key=lambda x: x.DollarVolume, reverse=True)
symbols = [x.Symbol for x in selected[:self.num_equities]]
### After coarse selection, we do PCA and linear regression to get our selected symbols
# Get historical data of the selected symbols
history = self.History(symbols, self.lookback, Resolution.Daily).close.unstack(level=0)
# Select the desired symbols and their weights for the portfolio from the coarse-selected symbols
try:
self.weights_buy,self.weights_sell,self.weights_liquidate = self.GetWeights(history)
except:
self.weights_buy,self.weights_sell,self.weights_liquidate = pd.DataFrame(),pd.DataFrame(),pd.DataFrame()
# If there is no final selected symbols, return the unchanged universe
if self.weights_buy.empty or self.weights_sell.empty or self.weights_liquidate.empty:
return Universe.Unchanged
return [x for x in symbols if str(x) in self.weights_buy.index or str(x) in self.weights_sell.index or str(x) in self.weights_liquidate]
def GetWeights(self, history):
'''
Get the finalized selected symbols and their weights according to their level of deviation
of the residuals from the linear regression after PCA for each symbol
'''
# Sample data for PCA
sample = history.dropna(axis=1).pct_change().dropna()
sample_mean = sample.mean()
sample_std = sample.std()
sample = ((sample-sample_mean)/(sample_std)) # Center it column-wise
# Fit the PCA model for sample data
model = PCA().fit(sample)
#Distributing eigenportfolios
EigenPortfolio = pd.DataFrame(model.components_)
EigenPortfolio.columns = sample.columns
EigenPortfolio = EigenPortfolio/sample_std
EigenPortfolio = ( EigenPortfolio.T / EigenPortfolio.sum(axis=1) )
# Get the first n_components factors
factors = np.dot(sample, EigenPortfolio)[:,:self.num_components]
# Add 1's to fit the linear regression (intercept)
factors = sm.add_constant(factors)
# Train Ordinary Least Squares linear model for each stock
OLSmodels = {ticker: sm.OLS(sample[ticker], factors).fit() for ticker in sample.columns}
# Get the residuals from the linear regression after PCA for each stock
resids = pd.DataFrame({ticker: model.resid for ticker, model in OLSmodels.items()})
# Get the OU parameters
shifted_residuals = resids.cumsum().iloc[1:,:]
resids = resids.cumsum().iloc[:-1,:]
resids.index = shifted_residuals.index
OLSmodels2 = {ticker: sm.OLS(resids[ticker],sm.add_constant(shifted_residuals[ticker])).fit() for ticker in resids.columns}
# Get the new residuals
resids2 = pd.DataFrame({ticker: model.resid for ticker, model in OLSmodels2.items()})
# Get the mean reversion parameters
a = pd.DataFrame({ticker : model.params[0] for ticker , model in OLSmodels2.items()},index=["a"])
b = pd.DataFrame({ticker: model.params[1] for ticker , model in OLSmodels2.items()},index=["a"])
b = b[ b < 0.97 ].dropna()
e = resids2.std() * 252 **( 1 / 2)
k = -np.log(b) * 252
k = k[ k > 252 / 30].dropna()
#Get the z-score
var = (e**2 /(2 * k) )*(1 - np.exp(-2 * k * 252))
num = -a * np.sqrt(1 - b**2)
den =( ( 1-b ) * np.sqrt( var )).dropna(axis=1)
m = ( a / ( 1 - b ) ).dropna(axis=1)
zscores=(num / den ).iloc[0,:] # zscores of the most recent day
# Get the stocks far from mean (for mean reversion)
selected_buy = zscores[zscores < -1.25]
selected_sell = zscores[zscores > 1.25]
selected_liquidate = zscores[abs(zscores) < 0.75 ]
#summing all orders
sum_orders = selected_buy.abs().sum() + selected_sell.abs().sum()
# Return the weights for each selected stock
weights_buy = selected_buy * (1 / sum_orders)
weights_sell = selected_sell * (1 / sum_orders)
weights_liquidate = selected_liquidate
return weights_buy.sort_values(),weights_sell.sort_values(),weights_liquidate.sort_values()
def OnData(self, data):
'''
Rebalance every self.rebalance_days
'''
### Do nothing until next rebalance
if self.Time < self.nextRebalance:
return
### Open positions
for symbol, weight in self.weights_buy.items():
# If the residual is way deviated from 0, we enter the position in the opposite way (mean reversion)
if self.Securities[symbol].Invested:
continue
self.SetHoldings(symbol, -weight)
### short positions
for symbol, weight in self.weights_sell.items():
if self.Securities[symbol].Invested:
continue
self.SetHoldings(symbol,-weight)
for symbol, weight in self.weights_liquidate.items():
self.Liquidate(symbol)
### Update next rebalance time
self.nextRebalance = self.Time + timedelta(self.rebalance_days)
def OnSecuritiesChanged(self, changes):
'''
Liquidate when the symbols are not in the universe
'''
for security in changes.RemovedSecurities:
if security.Invested:
self.Liquidate(security.Symbol, 'Removed from Universe')
# self.SetHoldings("SPY", 1)