| Overall Statistics |
|
Total Trades 1315 Average Win 0.06% Average Loss -0.07% Compounding Annual Return -3.477% Drawdown 17.000% Expectancy -0.349 Net Profit -16.232% Sharpe Ratio -1.024 Probabilistic Sharpe Ratio 0.000% Loss Rate 64% Win Rate 36% Profit-Loss Ratio 0.79 Alpha -0.025 Beta 0.022 Annual Standard Deviation 0.023 Annual Variance 0.001 Information Ratio -0.796 Tracking Error 0.11 Treynor Ratio -1.088 Total Fees $109.25 Estimated Strategy Capacity $43000000.00 Lowest Capacity Asset USDHKD 8G |
#region imports
from AlgorithmImports import *
from pykalman import KalmanFilter
from scipy.optimize import minimize
from statsmodels.tsa.vector_ar.vecm import VECM
#endregion
class KalmanFilterStatisticalArbitrageDemo(QCAlgorithm):
def Initialize(self):
#1. Required: Five years of backtest history
self.SetStartDate(2014, 1, 1)
self.SetEndDate(2019, 1, 1)
#2. Required: Alpha Streams Models:
self.SetBrokerageModel(BrokerageName.AlphaStreams)
#3. Required: Significant AUM Capacity
self.SetCash(1000000)
#4. Required: Benchmark to SPY
self.SetBenchmark("SPY")
self.assets = ["EURUSD", "GBPUSD", "USDCAD", "USDHKD", "USDJPY"]
# Add Equity ------------------------------------------------
for i in range(len(self.assets)):
self.AddForex(self.assets[i], Resolution.Minute)
# Instantiate our model
self.Recalibrate()
# Set a variable to indicate the trading bias of the portfolio
self.state = 0
# Set Scheduled Event Method For Kalman Filter updating.
self.Schedule.On(self.DateRules.WeekStart(),
self.TimeRules.At(0, 0),
self.Recalibrate)
# Set Scheduled Event Method For Kalman Filter updating.
self.Schedule.On(self.DateRules.EveryDay(),
self.TimeRules.BeforeMarketClose("EURUSD"),
self.EveryDayBeforeMarketClose)
def Recalibrate(self):
qb = self
history = qb.History(self.assets, 252*2, Resolution.Daily)
if history.empty: return
# Select the close column and then call the unstack method
data = history['close'].unstack(level=0)
# Convert into log-price series to eliminate compounding effect
log_price = np.log(data)
### Get Cointegration Vectors
# Initialize a VECM model following the unit test parameters, then fit to our data.
vecm_result = VECM(log_price, k_ar_diff=0, coint_rank=len(self.assets)-1, deterministic='n').fit()
# Obtain the Beta attribute. This is the cointegration subspaces' unit vectors.
beta = vecm_result.beta
# Check the spread of different cointegration subspaces.
spread = log_price @ beta
### Optimization of Cointegration Subspaces
# We set the weight on each vector is between -1 and 1. While overall sum is 0.
x0 = np.array([-1**i/beta.shape[1] for i in range(beta.shape[1])])
bounds = tuple((-1, 1) for i in range(beta.shape[1]))
constraints = [{'type':'eq', 'fun':lambda x: np.sum(x)}]
# Optimize the Portmanteau statistics
opt = minimize(lambda w: ((w.T @ np.cov(spread.T, spread.shift(1).fillna(0).T)[spread.shape[1]:, :spread.shape[1]] @ w)/(w.T @ np.cov(spread.T) @ w))**2,
x0=x0,
bounds=bounds,
constraints=constraints,
method="SLSQP")
# Normalize the result
opt.x = opt.x/np.sum(abs(opt.x))
new_spread = spread @ opt.x
### Kalman Filter
# Initialize a Kalman Filter. Using the first 20 data points to optimize its initial state. We assume the market has no regime change so that the transitional matrix and observation matrix is [1].
self.kalmanFilter = KalmanFilter(transition_matrices = [1],
observation_matrices = [1],
initial_state_mean = new_spread.iloc[:20].mean(),
observation_covariance = new_spread.iloc[:20].var(),
em_vars=['transition_covariance', 'initial_state_covariance'])
self.kalmanFilter = self.kalmanFilter.em(new_spread.iloc[:20], n_iter=5)
(filtered_state_means, filtered_state_covariances) = self.kalmanFilter.filter(new_spread.iloc[:20])
# Obtain the current Mean and Covariance Matrix expectations.
self.currentMean = filtered_state_means[-1, :]
self.currentCov = filtered_state_covariances[-1, :]
# Initialize a mean series for spread normalization using the Kalman Filter's results.
mean_series = np.array([None]*(new_spread.shape[0]-20))
# Roll over the Kalman Filter to obtain the mean series.
for i in range(20, new_spread.shape[0]):
(self.currentMean, self.currentCov) = self.kalmanFilter.filter_update(filtered_state_mean = self.currentMean,
filtered_state_covariance = self.currentCov,
observation = new_spread.iloc[i])
mean_series[i-20] = float(self.currentMean)
# Obtain the normalized spread series.
normalized_spread = (new_spread.iloc[20:] - mean_series)
### Determine Trading Threshold
# Initialize 50 set levels for testing.
s0 = np.linspace(0, max(normalized_spread), 50)
# Calculate the profit levels using the 50 set levels.
f_bar = np.array([None]*50)
for i in range(50):
f_bar[i] = len(normalized_spread.values[normalized_spread.values > s0[i]]) \
/ normalized_spread.shape[0]
# Set trading frequency matrix.
D = np.zeros((49, 50))
for i in range(D.shape[0]):
D[i, i] = 1
D[i, i+1] = -1
# Set level of lambda.
l = 1.0
# Obtain the normalized profit level.
f_star = np.linalg.inv(np.eye(50) + l * D.T@D) @ f_bar.reshape(-1, 1)
s_star = [f_star[i]*s0[i] for i in range(50)]
self.threshold = s0[s_star.index(max(s_star))]
# Set the trading weight. We would like the portfolio absolute total weight is 1 when trading.
trading_weight = beta @ opt.x
self.trading_weight = trading_weight / np.sum(abs(trading_weight))
def EveryDayBeforeMarketClose(self):
qb = self
# Get the real-time log close price for all assets and store in a Series
series = pd.Series()
for symbol in qb.Securities.Keys:
series[symbol] = np.log(qb.Securities[symbol].Close)
# Get the spread
spread = series @ self.trading_weight
# Update the Kalman Filter with the Series
(self.currentMean, self.currentCov) = self.kalmanFilter.filter_update(filtered_state_mean = self.currentMean,
filtered_state_covariance = self.currentCov,
observation = spread)
# Obtain the normalized spread.
normalized_spread = spread - self.currentMean
# ==============================
# Mean-reversion
if normalized_spread < -self.threshold:
orders = []
for i in range(len(self.assets)):
orders.append(PortfolioTarget(self.assets[i], self.trading_weight[i]))
self.SetHoldings(orders)
self.state = 1
elif normalized_spread > self.threshold:
orders = []
for i in range(len(self.assets)):
orders.append(PortfolioTarget(self.assets[i], -1 * self.trading_weight[i]))
self.SetHoldings(orders)
self.state = -1
# Out of position if spread recovered
elif self.state == 1 and normalized_spread > -self.threshold or self.state == -1 and normalized_spread < self.threshold:
self.Liquidate()
self.state = 0