| Overall Statistics |
|
Total Trades 1254 Average Win 0.07% Average Loss -0.06% Compounding Annual Return -0.037% Drawdown 2.600% Expectancy -0.003 Net Profit -0.147% Sharpe Ratio -0.017 Probabilistic Sharpe Ratio 0.727% Loss Rate 53% Win Rate 47% Profit-Loss Ratio 1.13 Alpha -0 Beta -0.003 Annual Standard Deviation 0.011 Annual Variance 0 Information Ratio 0.158 Tracking Error 0.061 Treynor Ratio 0.075 Total Fees $0.00 Estimated Strategy Capacity $72000000.00 Lowest Capacity Asset AUDUSD 8G Portfolio Turnover 14.21% |
#region imports
from AlgorithmImports import *
from arch.unitroot.cointegration import engle_granger
from pykalman import KalmanFilter
from scipy.optimize import minimize
from statsmodels.tsa.vector_ar.vecm import VECM
#endregion
class KalmanFilterStatisticalArbitrageAlphaModel(AlphaModel):
def __init__(self, lookback = 30, recalibrateInterval = timedelta(7)):
self.lookback = lookback
self.recalibrateInterval = recalibrateInterval
self.symbol_data = {}
self.kalmanFilter = None
self.currentMean = None
self.currentCov = None
self.threshold = None
self.trading_weight = {}
self.state = 0
self.rebalance_time = datetime.min
def Update(self, algorithm, data):
insights = []
if algorithm.Time > self.rebalance_time:
self.Recalibrate()
self.rebalance_time = algorithm.Time + self.recalibrateInterval
if not self.trading_weight: return insights
# We take the latest cached price as the next time point input (does not matter if no new data point)
data = [np.log(algorithm.Securities[symbol].Close) * self.trading_weight[symbol]
for symbol in self.symbol_data.keys()]
spread = np.product(data)
if not spread: return insights
# If all pairs got consolidated data and updated their daily price, we update the Kalman Filter
if all([data.updated for data in self.symbol_data.values()]):
# Update the Kalman Filter with the spread
(self.currentMean, self.currentCov) = self.kalmanFilter.filter_update(filtered_state_mean = self.currentMean,
filtered_state_covariance = self.currentCov,
observation = spread)
# reset the flag
for data in self.symbol_data.values():
data.updated = False
# Obtain the normalized spread
normalized_spread = spread - self.currentMean
# Mean-reversion
if normalized_spread < -self.threshold and self.state != 1:
for symbol, weight in self.trading_weight.items():
if algorithm.IsMarketOpen(symbol):
insights.append(
Insight.Price(symbol, timedelta(365), InsightDirection.Up, weight=weight))
self.state = 1
elif normalized_spread > self.threshold and self.state != -1:
for symbol, weight in self.trading_weight.items():
if algorithm.IsMarketOpen(symbol):
insights.append(
Insight.Price(symbol, timedelta(365), InsightDirection.Down, weight=weight))
self.state = -1
# Out of position if spread converged
elif (self.state == 1 and normalized_spread > 0) or (self.state == -1 and normalized_spread < 0):
algorithm.Insights.Cancel(list(self.symbol_data.keys()))
self.state = 0
return insights
def Recalibrate(self):
# Get log price series of all signaled assets
log_price = np.log(
pd.DataFrame({symbol: data.Price for symbol, data in self.symbol_data.items() if data.IsReady}))
if log_price.empty: return
# Get the weighted spread across different cointegration subspaces
weighted_spread, weights, beta = self.GetSpreads(log_price)
# Set up the Kalman Filter with the weighted spread series, and obtain the adjusted mean series
mean_series = self.SetKalmanFilter(weighted_spread)
# Obtain the normalized spread series, the first 20 in-sample will be discarded.
normalized_spread = (weighted_spread.iloc[20:] - mean_series)
# Set the threshold of price divergence to optimize profit
self.SetTradingThreshold(normalized_spread)
# Set the normalize trading weight
weights = self.GetTradingWeight(beta, weights)
for symbol, weight in zip(log_price.columns, weights):
self.trading_weight[symbol] = weight
def GetSpreads(self, logPriceDf):
# Initialize a VECM model following the unit test parameters, then fit to our data.
# We allow 3 AR difference, and no deterministic term.
vecm_result = VECM(logPriceDf, k_ar_diff=3, coint_rank=logPriceDf.shape[1]-1, deterministic='n').fit()
# Obtain the Beta attribute. This is the cointegration subspaces' unit vectors.
beta = vecm_result.beta
# get the spread of different cointegration subspaces.
spread = logPriceDf @ beta
# Optimize the distribution across cointegration subspaces and return the weighted spread
return self.OptimizeSpreads(spread, beta)
def OptimizeSpreads(self, spread, beta):
# We set the weight on each vector is between -1 and 1. While overall sum is 0.
x0 = np.array([-1**i / beta.shape[1] for i in range(beta.shape[1])])
bounds = tuple((-1, 1) for i in range(beta.shape[1]))
constraints = [{'type': 'eq', 'fun': lambda x: np.sum(x)}]
# Optimize the Portmanteau statistics
opt = minimize(lambda w: ((w.T @ np.cov(spread.T, spread.shift(1).fillna(0).T)[spread.shape[1]:, :spread.shape[1]] @ w)\
/ (w.T @ np.cov(spread.T) @ w))**2,
x0=x0,
bounds=bounds,
constraints=constraints,
method="SLSQP")
# Normalize the result
opt.x = opt.x / np.sum(abs(opt.x))
# Return the weighted spread series
return spread @ opt.x, opt.x, beta
def SetKalmanFilter(self, weighted_spread):
# Initialize a Kalman Filter. Using the first 20 data points to optimize its initial state.
# We assume the market has no regime change so that the transitional matrix and observation matrix is [1].
self.kalmanFilter = KalmanFilter(transition_matrices = [1],
observation_matrices = [1],
initial_state_mean = weighted_spread.iloc[:20].mean(),
observation_covariance = weighted_spread.iloc[:20].var(),
em_vars=['transition_covariance', 'initial_state_covariance'])
self.kalmanFilter = self.kalmanFilter.em(weighted_spread.iloc[:20], n_iter=5)
(filtered_state_means, filtered_state_covariances) = self.kalmanFilter.filter(weighted_spread.iloc[:20])
# Obtain the current Mean and Covariance Matrix expectations.
self.currentMean = filtered_state_means[-1, :]
self.currentCov = filtered_state_covariances[-1, :]
# Initialize a mean series for spread normalization using the Kalman Filter's results.
mean_series = np.array([None]*(weighted_spread.shape[0]-20))
# Roll over the Kalman Filter to obtain the mean series.
for i in range(20, weighted_spread.shape[0]):
(self.currentMean, self.currentCov) = self.kalmanFilter.filter_update(filtered_state_mean = self.currentMean,
filtered_state_covariance = self.currentCov,
observation = weighted_spread.iloc[i])
mean_series[i-20] = float(self.currentMean)
return mean_series
def SetTradingThreshold(self, normalized_spread):
# Initialize 20 set levels for testing.
s0 = np.linspace(0, max(normalized_spread), 20)
# Calculate the profit levels using the 20 set levels.
f_bar = np.array([None] * 20)
for i in range(20):
f_bar[i] = len(normalized_spread.values[normalized_spread.values > s0[i]]) \
/ normalized_spread.shape[0]
# Set trading frequency matrix.
D = np.zeros((19, 20))
for i in range(D.shape[0]):
D[i, i] = 1
D[i, i+1] = -1
# Set level of lambda.
l = 1.0
# Obtain the normalized profit level.
f_star = np.linalg.inv(np.eye(20) + l * D.T @ D) @ f_bar.reshape(-1, 1)
s_star = [f_star[i] * s0[i] for i in range(20)]
self.threshold = s0[s_star.index(max(s_star))]
def GetTradingWeight(self, beta, weights):
trading_weight = beta @ weights
return trading_weight / np.sum(abs(trading_weight))
def OnSecuritiesChanged(self, algorithm, changes):
for removed in changes.RemovedSecurities:
symbolData = self.symbol_data.pop(removed.Symbol, None)
if symbolData:
symbolData.Dispose()
for added in changes.AddedSecurities:
symbol = added.Symbol
if symbol not in self.symbol_data and added.Type == SecurityType.Forex:
self.symbol_data[symbol] = SymbolData(algorithm, symbol, self.lookback)
class SymbolData:
def __init__(self, algorithm, symbol, lookback):
self.algorithm = algorithm
self.symbol = symbol
self.lookback = lookback
self.updated = False
# To store the historical daily log return
self.window = RollingWindow[IndicatorDataPoint](lookback)
# Use daily log return to predict cointegrating vector
self.consolidator = QuoteBarConsolidator(timedelta(hours=1))
self.price = Identity(f"{symbol} Price")
self.price.Updated += self.OnUpdate
# Subscribe the consolidator and indicator to data for automatic update
algorithm.RegisterIndicator(symbol, self.price, self.consolidator)
algorithm.SubscriptionManager.AddConsolidator(symbol, self.consolidator)
# historical warm-up on the log return indicator
history = algorithm.History[QuoteBar](self.symbol, self.lookback, Resolution.Hour)
for bar in history:
self.consolidator.Update(bar)
def OnUpdate(self, sender, updated):
self.window.Add(IndicatorDataPoint(updated.EndTime, updated.Value))
self.updated = True
def Dispose(self):
self.price.Updated -= self.OnUpdate
self.price.Reset()
self.window.Reset()
self.algorithm.SubscriptionManager.RemoveConsolidator(self.symbol, self.consolidator)
@property
def IsReady(self):
return self.window.IsReady
@property
def Price(self):
return pd.Series(
data = [x.Value for x in self.window],
index = [x.EndTime for x in self.window])[::-1]
# region imports
from AlgorithmImports import *
from alpha import KalmanFilterStatisticalArbitrageAlphaModel
# endregion
class KalmanFilterStatisticalArbitrageAlgorithm(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2019, 1, 1)
self.SetEndDate(2023, 1, 1)
self.SetCash(100000)
self.SetBrokerageModel(BrokerageName.OandaBrokerage, AccountType.Margin)
self.UniverseSettings.Resolution = Resolution.Minute
# We focus on major forex pairs
symbols = [ Symbol.Create(pair, SecurityType.Forex, Market.Oanda) for pair in
["AUDUSD", "EURUSD", "GBPUSD", "USDCAD", "USDCHF", "USDJPY"] ]
self.SetUniverseSelection(ManualUniverseSelectionModel(symbols))
# A custom alpha model for Kalman Filter prediction and statistical arbitrage signaling
self.AddAlpha(KalmanFilterStatisticalArbitrageAlphaModel())
# Use the insight weights for sizing, set a very long rebalance period to avoid constant rebalancing
self.SetPortfolioConstruction(InsightWeightingPortfolioConstructionModel(Expiry.EndOfYear))