Hey Everyone,

In this installment, I'm going to walk you through how to use and apply Kalman filters in your algorithms. Before we start, I want to note that there are a few Python packages out there for Kalman filters, but we're adapting this example and the Kalman filter class code from this article and demonstrating how you can implement similar ideas using QuantConnect!

Briefly, a Kalman filter is a state-space model applicable to linear dynamic systems -- systems whose state is time-dependent and state variations are represented linearly. The model is used to estimate unknown states of a variable based on a series of past values. The procedure is two-fold: a prediction (estimate) is made by the filter of the current state of a variable and the uncertainty of the estimate itself. When new data is available, these estimates are updated. There is a lot of information available about Kalman filters, and the variety of their applications is pretty astounding, but for now, we're going to use a Kalman filter to estimate the hedge ratio between a pair of equities.

The idea behind the strategy is pretty straightforward: take two equities that are cointegrated and create a long-short portfolio. The premise of this is that the spread between the value of our two positions should be mean-reverting. Anytime the spread deviates from its expected value, one of the assets moved in an unexpected direction and is due to revert back. When the spread diverges, you can take advantage of this by going long or short on the spread.

To illustrate, imagine you have a long position in "AAPL" worth $2000 and a short position in "IBM" worth$2000. This gives you a net spread of $0. Since you expected AAPL and IBM to move together, then if the spread increases significantly above$0, you would short the spread in the expectation that it will return to $0, it's natural equilibrium. Similarly, if the value drops significantly below$0, you would long the spread and capture the profits as its value returns to \$0. In our application, the Kalman filter will be used to track the hedging ratio between our equities to ensure that the portfolio value is stationary, which means it will continue to exhibit mean-reversion behavior.

First, we used a research notebook to test out the Kalman filter class from the article linked above.

import numpy as np from math import floor class KalmanFilter: def __init__(self): self.delta = 1e-4 self.wt = self.delta / (1 - self.delta) * np.eye(2) self.vt = 1e-3 self.theta = np.zeros(2) self.P = np.zeros((2, 2)) self.R = None self.qty = 2000 def update(self, price_one, price_two): # Create the observation matrix of the latest prices # of TLT and the intercept value (1.0) F = np.asarray([price_one, 1.0]).reshape((1, 2)) y = price_two # The prior value of the states \theta_t is # distributed as a multivariate Gaussian with # mean a_t and variance-covariance R_t if self.R is not None: self.R = self.C + self.wt else: self.R = np.zeros((2, 2)) # Calculate the Kalman Filter update # ---------------------------------- # Calculate prediction of new observation # as well as forecast error of that prediction yhat = F.dot(self.theta) et = y - yhat # Q_t is the variance of the prediction of # observations and hence \sqrt{Q_t} is the # standard deviation of the predictions Qt = F.dot(self.R).dot(F.T) + self.vt sqrt_Qt = np.sqrt(Qt) # The posterior value of the states \theta_t is # distributed as a multivariate Gaussian with mean # m_t and variance-covariance C_t At = self.R.dot(F.T) / Qt self.theta = self.theta + At.flatten() * et self.C = self.R - At * F.dot(self.R) hedge_quantity = int(floor(self.qty*self.theta[0])) return et, sqrt_Qt, hedge_quantity

import numpy as np from math import floor import matplotlib.pyplot as plt from KalmanFilter import KalmanFilter qb = QuantBook() symbols = [qb.AddEquity(x).Symbol for x in ['VIA', 'VIAB']] # Initialize Kalman Filter imported from another file kf = KalmanFilter() # Fetch history history = qb.History(qb.Securities.Keys, 10, Resolution.Daily) # Get close prices prices = history.unstack(level=1).close.transpose() # Iterate over prices, update filter, and print results for index, row in prices.iterrows(): via = row.loc['VIA 2T'] viab = row.loc['VIAB 2T'] forecast_error, prediction_std_dev, hedge_quantity = kf.update(via, viab) print(f'{forecast_error} :: {prediction_std_dev} :: {hedge_quantity}') 

The above code allowed us to test out our code in the research environment, and now we can implement it in practice. To do this, we built a simple pairs trading model that uses VIA and VIAB. We didn't test these two equities for cointegration but instead made the assumption that they will move together as they are different share classes of Viacom and should, theoretically, move the same direction with the same magnitude.

 def Initialize(self): self.SetStartDate(2016, 1, 1) # Set Start Date self.SetCash(100000) # Set Strategy Cash self.SetBrokerageModel(AlphaStreamsBrokerageModel()) self.symbols = [self.AddEquity(x, Resolution.Minute).Symbol for x in ['VIA', 'VIAB']] self.kf = KalmanFilter() self.invested = None self.Schedule.On(self.DateRules.EveryDay('VIA'), self.TimeRules.BeforeMarketClose('VIA', 5), self.UpdateAndTrade)

We initialized an instance of the Kalman Filter class in Initialize(), which we then use in the UpdateAndTrade() method to figure out our optimal position sizing -- this is done to maintain a proper ratio of our long and short positions so as to ensure that the spread remains stationary and mean-reverting. In the UpdateAndTrade() method, we update the Kalman filter with the new price data and get the forecast error, prediction standard deviation, and hedge ratio. The trade signals for us are that we long the spread if the forecast error is less than the negative of the standard deviation of the spread and we exit this position if the forecast error is greater than the negative of the standard deviation of the spread. We short the spread (take opposite positions in both equities) when the forecast error is greater than the standard deviation, and we close this position when the forecast error is less than the standard deviation.

This may seem a bit complicated or abstract, but the essential point is that when our forecast error (the difference between the current value of VIA and the Kalman filter's estimate for today of VIA) is negative, the Kalman filter is saying that the current spread is lower than it is expected to be and so it is due to move back to its expected value. Similarly, when the forecast error is larger than the standard deviation of the predictions, then the spread is higher than expected and will drop back to its expected value.

 def UpdateAndTrade(self): # Get recent price and holdings information via = self.CurrentSlice[self.symbols[0]].Close viab = self.CurrentSlice[self.symbols[1]].Close holdings = self.Portfolio[self.symbols[0]] forecast_error, prediction_std_dev, hedge_quantity = self.kf.update(via, viab) if not holdings.Invested: # Long the spread if forecast_error < -prediction_std_dev: insights = Insight.Group([Insight(self.symbols[0], timedelta(1), InsightType.Price, InsightDirection.Down), Insight(self.symbols[1], timedelta(1), InsightType.Price, InsightDirection.Up)]) self.EmitInsights(insights) self.MarketOrder(self.symbols[1], self.kf.qty) self.MarketOrder(self.symbols[0], -hedge_quantity) # Short the spread elif forecast_error > prediction_std_dev: insights = Insight.Group([Insight(self.symbols[0], timedelta(1), InsightType.Price, InsightDirection.Up), Insight(self.symbols[1], timedelta(1), InsightType.Price, InsightDirection.Down)]) self.EmitInsights(insights) self.MarketOrder(self.symbols[1], -self.kf.qty) self.MarketOrder(self.symbols[0], hedge_quantity) if holdings.Invested: # Close long position if holdings.IsShort and (forecast_error >= -prediction_std_dev): insights = Insight.Group([Insight(self.symbols[0], timedelta(1), InsightType.Price, InsightDirection.Flat), Insight(self.symbols[1], timedelta(1), InsightType.Price, InsightDirection.Flat)]) self.EmitInsights(insights) self.Liquidate() self.invested = None # Close short position elif holdings.IsLong and (forecast_error <= prediction_std_dev): insights = Insight.Group([Insight(self.symbols[0], timedelta(1), InsightType.Price, InsightDirection.Flat), Insight(self.symbols[1], timedelta(1), InsightType.Price, InsightDirection.Flat)]) self.EmitInsights(insights) self.Liquidate() self.invested = None

This is one application of a Kalman filter in finance, and there are countless others. We encourage you to explore building your own Kalman filter class, using the Python libraries, or apply this one to your own research and trading!

(The code for the Kalman filter was taken from an article posted here and the basic strategy is taken from Ernie Chan's book on algorithmic trading)