Overall Statistics
Total Trades
1445
Average Win
0.74%
Average Loss
-0.69%
Compounding Annual Return
5.208%
Drawdown
5.500%
Expectancy
0.064
Net Profit
35.336%
Sharpe Ratio
1.022
Probabilistic Sharpe Ratio
49.583%
Loss Rate
49%
Win Rate
51%
Profit-Loss Ratio
1.08
Alpha
0.052
Beta
0.022
Annual Standard Deviation
0.053
Annual Variance
0.003
Information Ratio
-0.031
Tracking Error
0.242
Treynor Ratio
2.407
Total Fees
$0.00
import numpy as np
from math import floor

class KalmanFilter:
    def __init__(self):
        self.delta = 1e-4
        self.wt = self.delta / (1 - self.delta) * np.eye(2)
        self.vt = 1e-3
        self.theta = np.zeros(2)
        self.P = np.zeros((2, 2))
        self.R = None
        self.qty = 2000

    def update(self, price_one, price_two):
        # Create the observation matrix of the latest prices
        # of TLT and the intercept value (1.0)
        F = np.asarray([price_one, 1.0]).reshape((1, 2))
        y = price_two

        # The prior value of the states \theta_t is
        # distributed as a multivariate Gaussian with
        # mean a_t and variance-covariance R_t
        if self.R is not None:
            self.R = self.C + self.wt
        else:
            self.R = np.zeros((2, 2))

        # Calculate the Kalman Filter update
        # ----------------------------------
        # Calculate prediction of new observation
        # as well as forecast error of that prediction
        yhat = F.dot(self.theta)
        et = y - yhat

        # Q_t is the variance of the prediction of
        # observations and hence \sqrt{Q_t} is the
        # standard deviation of the predictions
        Qt = F.dot(self.R).dot(F.T) + self.vt
        sqrt_Qt = np.sqrt(Qt)

        # The posterior value of the states \theta_t is
        # distributed as a multivariate Gaussian with mean
        # m_t and variance-covariance C_t
        At = self.R.dot(F.T) / Qt
        self.theta = self.theta + At.flatten() * et
        self.C = self.R - At * F.dot(self.R)
        hedge_quantity = int(floor(self.qty*self.theta[0]))
        
        return et, sqrt_Qt, hedge_quantity
import numpy as np
from math import floor
from KalmanFilter import KalmanFilter
class VerticalParticleInterceptor(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2006, 4, 26)  # Set Start Date
        self.SetEndDate(2012, 4, 9)
        self.SetCash(100000)  # Set Strategy Cash

        self.SetSecurityInitializer(self.CustomSecurityInitializer)
        
        self.symbols = [self.AddEquity(x, Resolution.Minute).Symbol for x in ['EWA', 'EWC']]
        self.kf = KalmanFilter()
        self.invested = None

        self.Schedule.On(self.DateRules.EveryDay('EWA'), self.TimeRules.BeforeMarketClose('EWA', 5), self.UpdateAndTrade)

    
    def CustomSecurityInitializer(self, security):
        security.SetFeeModel(CustomFeeModel())
        security.SetDataNormalizationMode(DataNormalizationMode.Raw)
    
    def UpdateAndTrade(self):
        
        # Get recent price and holdings information
        ewa = self.CurrentSlice[self.symbols[0]].Close
        ewc = self.CurrentSlice[self.symbols[1]].Close
        holdings = self.Portfolio[self.symbols[0]]
        
        forecast_error, prediction_std_dev, hedge_quantity = self.kf.update(ewa, ewc)
        
        if not holdings.Invested:
            # Long the spread
            if forecast_error < -prediction_std_dev:
                insights = Insight.Group([Insight(self.symbols[0], timedelta(1), InsightType.Price, InsightDirection.Down),
                                           Insight(self.symbols[1], timedelta(1), InsightType.Price, InsightDirection.Up)])
                self.EmitInsights(insights)
                self.MarketOrder(self.symbols[1], self.kf.qty)
                self.MarketOrder(self.symbols[0], -hedge_quantity)

            # Short the spread
            elif forecast_error > prediction_std_dev:
                insights = Insight.Group([Insight(self.symbols[0], timedelta(1), InsightType.Price, InsightDirection.Up),
                                           Insight(self.symbols[1], timedelta(1), InsightType.Price, InsightDirection.Down)])
                self.EmitInsights(insights)
                self.MarketOrder(self.symbols[1], -self.kf.qty)
                self.MarketOrder(self.symbols[0], hedge_quantity)

        if holdings.Invested:
            # Close long position
            if holdings.IsShort and (forecast_error >= -prediction_std_dev):
                insights = Insight.Group([Insight(self.symbols[0], timedelta(1), InsightType.Price, InsightDirection.Flat),
                                           Insight(self.symbols[1], timedelta(1), InsightType.Price, InsightDirection.Flat)])
                self.EmitInsights(insights)
                self.Liquidate()
                self.invested = None
            
            # Close short position
            elif holdings.IsLong and (forecast_error <= prediction_std_dev):
                insights = Insight.Group([Insight(self.symbols[0], timedelta(1), InsightType.Price, InsightDirection.Flat),
                                           Insight(self.symbols[1], timedelta(1), InsightType.Price, InsightDirection.Flat)])
                self.EmitInsights(insights)
                self.Liquidate()
                self.invested = None
                
class CustomFeeModel:
    def GetOrderFee(self, parameters):
        return OrderFee(CashAmount(0, 'USD'))