| Overall Statistics |
|
Total Trades 567 Average Win 1.10% Average Loss -0.68% Compounding Annual Return 1.784% Drawdown 18.400% Expectancy 0.068 Net Profit 9.244% Sharpe Ratio 0.166 Probabilistic Sharpe Ratio 1.172% Loss Rate 59% Win Rate 41% Profit-Loss Ratio 1.61 Alpha 0.017 Beta 0.023 Annual Standard Deviation 0.113 Annual Variance 0.013 Information Ratio -0.298 Tracking Error 0.208 Treynor Ratio 0.795 Total Fees $1562.11 Estimated Strategy Capacity $1500000.00 Lowest Capacity Asset GLD T3SKPOF94JFP |
#region imports
from AlgorithmImports import *
#endregion
import numpy as np
from math import floor
class KalmanFilter:
def __init__(self):
self.delta1 = 1e-4
self.delta2 = 1e-4
self.delta3 = 1e-4
self.wt1 = self.delta1 / (1 - self.delta1)
self.wt2 = self.delta2 / (1 - self.delta2)
self.wt3 = self.delta3 / (1 - self.delta3)
self.wt = np.array([[self.wt1, 0, 0], [0, self.wt2, 0], [0, 0, self.wt3]])
self.vt = 1e-3
self.theta = np.zeros(3)
self.P = np.zeros((3, 3))
self.R = None
self.qty = 2000
def update(self, price_one, price_two, price_three):
# Create the observation matrix of the latest prices
# of TLT and the intercept value (1.0)
F = np.asarray([price_one, price_two, 1.0]).reshape((1, 3))
y = price_three
# The prior value of the states \theta_t is
# distributed as a multivariate Gaussian with
# mean a_t and variance-covariance R_t
if self.R is not None:
self.R = self.C + self.wt
else:
self.R = np.zeros((3, 3))
# Calculate the Kalman Filter update
# ----------------------------------
# Calculate prediction of new observation
# as well as forecast error of that prediction
yhat = F.dot(self.theta)
et = y - yhat
# Q_t is the variance of the prediction of
# observations and hence \sqrt{Q_t} is the
# standard deviation of the predictions
Qt = F.dot(self.R).dot(F.T) + self.vt
sqrt_Qt = np.sqrt(Qt)
# The posterior value of the states \theta_t is
# distributed as a multivariate Gaussian with mean
# m_t and variance-covariance C_t
At = self.R.dot(F.T) / Qt
self.theta = self.theta + At.flatten() * et
self.C = self.R - At * F.dot(self.R)
hedge_quantity = int(floor(self.qty*self.theta[0]))
return et, sqrt_Qt, hedge_quantity#region imports
from AlgorithmImports import *
#endregion
import numpy as np
from math import floor
from KalmanFilter import KalmanFilter
class VerticalParticleInterceptor(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2018, 1, 1) # Set Start Date
self.SetEndDate(2023, 1, 1) # Set End Date
self.SetCash(100000) # Set Strategy Cash
self.SetBrokerageModel(AlphaStreamsBrokerageModel())
s1 = "SHY"
s2 = "GLD"
s3 = "AAPL"
self.symbols = [self.AddEquity(x, Resolution.Minute).Symbol for x in [s1, s2, s3]]
self.kf = KalmanFilter()
self.invested = None
self.Schedule.On(self.DateRules.EveryDay(s1), self.TimeRules.BeforeMarketClose(s1, 5), self.UpdateAndTrade)
def UpdateAndTrade(self):
# Get recent price and holdings information
sa = self.CurrentSlice[self.symbols[0]].Close
sb = self.CurrentSlice[self.symbols[1]].Close
sc = self.CurrentSlice[self.symbols[2]].Close
holdings = self.Portfolio[self.symbols[0]]
forecast_error, prediction_std_dev, hedge_quantity = self.kf.update(sa, sb, sc)
if not holdings.Invested:
# Long the spread
if forecast_error < -prediction_std_dev:
insights = Insight.Group([Insight(self.symbols[0], timedelta(1), InsightType.Price, InsightDirection.Down),
Insight(self.symbols[1], timedelta(1), InsightType.Price, InsightDirection.Up),
Insight(self.symbols[2], timedelta(1), InsightType.Price, InsightDirection.Up)])
self.EmitInsights(insights)
self.MarketOrder(self.symbols[2], self.kf.qty * 0.3)
self.MarketOrder(self.symbols[1], self.kf.qty * 0.3)
self.MarketOrder(self.symbols[0], -hedge_quantity * 0.3)
# Short the spread
elif forecast_error > prediction_std_dev:
insights = Insight.Group([Insight(self.symbols[0], timedelta(1), InsightType.Price, InsightDirection.Up),
Insight(self.symbols[1], timedelta(1), InsightType.Price, InsightDirection.Down),
Insight(self.symbols[2], timedelta(1), InsightType.Price, InsightDirection.Down)])
self.EmitInsights(insights)
self.MarketOrder(self.symbols[2], -self.kf.qty * 0.3)
self.MarketOrder(self.symbols[1], -self.kf.qty * 0.3)
self.MarketOrder(self.symbols[0], hedge_quantity * 0.3)
if holdings.Invested:
# Close long position
if holdings.IsShort and (forecast_error >= -prediction_std_dev):
insights = Insight.Group([Insight(self.symbols[0], timedelta(1), InsightType.Price, InsightDirection.Flat),
Insight(self.symbols[1], timedelta(1), InsightType.Price, InsightDirection.Flat),
Insight(self.symbols[2], timedelta(1), InsightType.Price, InsightDirection.Flat)])
self.EmitInsights(insights)
self.Liquidate()
self.invested = None
# Close short position
elif holdings.IsLong and (forecast_error <= prediction_std_dev):
insights = Insight.Group([Insight(self.symbols[0], timedelta(1), InsightType.Price, InsightDirection.Flat),
Insight(self.symbols[1], timedelta(1), InsightType.Price, InsightDirection.Flat),
Insight(self.symbols[2], timedelta(1), InsightType.Price, InsightDirection.Flat)])
self.EmitInsights(insights)
self.Liquidate()
self.invested = None