Overall Statistics
Total Trades
735
Average Win
2.26%
Average Loss
-0.96%
Compounding Annual Return
31.548%
Drawdown
24.300%
Expectancy
1.553
Net Profit
5008.488%
Sharpe Ratio
1.643
Probabilistic Sharpe Ratio
98.855%
Loss Rate
24%
Win Rate
76%
Profit-Loss Ratio
2.35
Alpha
0.199
Beta
0.252
Annual Standard Deviation
0.133
Annual Variance
0.018
Information Ratio
0.772
Tracking Error
0.178
Treynor Ratio
0.871
Total Fees
$1822.76
Estimated Strategy Capacity
$6000000.00
Lowest Capacity Asset
FDN TJPMW3BHNMUD
#region imports
from AlgorithmImports import *
#endregion
from pykalman import KalmanFilter
import statistics
        
class KalmanFilterIndicator(PythonIndicator):  
    def __init__(self,name, period, selector=Field.Close,
                 transition_matrices = [1], observation_matrices = [1], 
                 initial_state_mean = 0, initial_state_covariance = 1, 
                 observation_covariance=1, transition_covariance=.01):
                     
        self.Name     = name
        self.period   = period
        self.Value    = 0
        self.barCalc  = selector
        
        self.transition_matrices      = transition_matrices
        self.observation_matrices     = observation_matrices
        self.initial_state_mean       = initial_state_mean 
        self.initial_state_covariance = initial_state_covariance
        self.observation_covariance   = observation_covariance
        self.transition_covariance    = transition_covariance
        
        self.rollingWindow = RollingWindow[float](self.period)
    
    
    # ---------------------------------    
    def Update(self, inputBar):
        
        effectiveBarValue = self.barCalc(inputBar) # round(float(statistics.median([Field.Open(inputBar), Field.High(inputBar), Field.Low(inputBar), Field.Close(inputBar)])), 4)# self.barCalc(inputBar) 
        self.rollingWindow.Add(effectiveBarValue)         
        
        if(not self.rollingWindow.IsReady):
            return False
        else:

            basisValue = np.flipud(np.array([self.rollingWindow[i] for i in range(self.period)]))
            
            self.kf = KalmanFilter( transition_matrices = self.transition_matrices,
                                    observation_matrices     = self.observation_matrices,
                                    initial_state_mean       = self.initial_state_mean,
                                    initial_state_covariance = self.initial_state_covariance,
                                    observation_covariance   = self.observation_covariance,
                                    transition_covariance    = self.transition_covariance)
                                    
            #self.kf = self.kf.em(basisValue, n_iter=5)
            
            kf,_ = self.kf.filter(basisValue) # self.kf.smooth(basisValue)
            currKalman = kf[-1]

            self.Value = float(currKalman)
            return True
        
################################################################################
#
# LaguerreFilterIndicator
# ==============================
# Laguerre Filter as defined by John F. Ehlers in `Cybernetic Analysis for 
# Stock and Futures`, 2004, published by Wiley. `ISBN: 978-0-471-46307-8
# https://www.mt5users.com/wp-content/uploads/2020/01/timewarp.pdf
#
# Copied from @vladimir's implementation
# https://www.quantconnect.com/forum/discussion/11788/another-digital-filter-laguerre-filter/p1/comment-34897
# 
################################################################################
        
class LaguerreFilterIndicator(PythonIndicator):  
    def __init__(self, name, gamma ):
        self.Name = name
        self.gamma = gamma
        self.prices = np.array([])
        self.Value = 0
        self.L0 = 0.0; self.L1 = 0.0; self.L2 = 0.0; self.L3 = 0.0
        
    
    def Update(self, input):
        mp = (input.High + input.Low)/2
        self.prices = np.append(self.prices, mp)[-4:]
        if len(self.prices) <= 1:
            self.L0 = mp; self.L1 = mp; self.L2 = mp; self.L3 = mp;
        
        if len(self.prices) != 4 : return
    
        L01 = self.L0; L11 = self.L1; L21 = self.L2; L31 = self.L3;
        g = self.gamma  
        
        self.L0 = (1 - g)*mp + g*L01
        self.L1 = L01 - g*self.L0 + g*L11
        self.L2 = L11 - g*self.L1 + g*L21
        self.L3 = L21 - g*self.L2 + g*L31
        
        if len(self.prices) != 4 :
            self.Value = mp
            return False
        
        self.Value = (self.L0 + (2*self.L1) + 2*(self.L2) + self.L3) / 6
        return True        
        
#region imports
from AlgorithmImports import *
#endregion

###################################################
#
#  Smart Rolling window
#  ========================
#  Convenience object to build on RollingWindow functionality
#
#  Methods:
#  -------------------------
#  mySmartWindow.IsRising()
#  mySmartWindow.IsFalling()
#  mySmartWindow.crossedAboveValue(value)
#  mySmartWindow.crossedBelowValue(value)
#  mySmartWindow.crossedAbove(otherWindow)
#  mySmartWindow.crossedBelow(otherWindow)
#  mySmartWindow.IsFlat(decimalPrecision)
#  mySmartWindow.hasAtLeastThisMany(value)
#
###################################################

class SmartRollingWindow():
    
    def __init__(self, windowType, windowLength):
        self.window    = None
        self.winLength = windowLength

        if (windowType is "int"):self.window = RollingWindow[int](windowLength)
        elif (windowType is "bool"):self.window = RollingWindow[bool](windowLength)
        elif (windowType is "float"):self.window = RollingWindow[float](windowLength)
        elif (windowType is "TradeBar"):self.window = RollingWindow[TradeBar](windowLength)

    def crossedAboveValue(self, value): return (self.window[1] <= value < self.window[0])
    def crossedBelowValue(self, value): return (self.window[1] >= value > self.window[0])

    def crossedAbove(self, series): return (any(self.window[i+1] <= series[i+1] and self.window[i] > series[i] for i in range(0, self.winLength-1)))
    def crossedBelow(self, series): return (any(self.window[i+1] >= series[i+1] and self.window[i] < series[i] for i in range(0, self.winLength-1)))

    def isAbove(self, series): return (self.window[0] > series[0])
    def isBelow(self, series): return (self.window[0] < series[0])
    
    def isFlat(self):    return (self.window[1] == self.window[0])
    def isFalling(self): return (self.window[1] > self.window[0])
    def isRising(self):  return (self.window[1] < self.window[0])

    def Add(self,value): 
        self.window.Add(value)

    def IsReady(self):
        return (self.window is not None) and \
               (self.window.Count >= self.winLength) ## TODO: just use rw.IsReady?
    
    def __getitem__(self, index):
        return self.window[index]