| Overall Statistics |
|
Total Trades 735 Average Win 2.26% Average Loss -0.96% Compounding Annual Return 31.548% Drawdown 24.300% Expectancy 1.553 Net Profit 5008.488% Sharpe Ratio 1.643 Probabilistic Sharpe Ratio 98.855% Loss Rate 24% Win Rate 76% Profit-Loss Ratio 2.35 Alpha 0.199 Beta 0.252 Annual Standard Deviation 0.133 Annual Variance 0.018 Information Ratio 0.772 Tracking Error 0.178 Treynor Ratio 0.871 Total Fees $1822.76 Estimated Strategy Capacity $6000000.00 Lowest Capacity Asset FDN TJPMW3BHNMUD |
#region imports
from AlgorithmImports import *
#endregion
from pykalman import KalmanFilter
import statistics
class KalmanFilterIndicator(PythonIndicator):
def __init__(self,name, period, selector=Field.Close,
transition_matrices = [1], observation_matrices = [1],
initial_state_mean = 0, initial_state_covariance = 1,
observation_covariance=1, transition_covariance=.01):
self.Name = name
self.period = period
self.Value = 0
self.barCalc = selector
self.transition_matrices = transition_matrices
self.observation_matrices = observation_matrices
self.initial_state_mean = initial_state_mean
self.initial_state_covariance = initial_state_covariance
self.observation_covariance = observation_covariance
self.transition_covariance = transition_covariance
self.rollingWindow = RollingWindow[float](self.period)
# ---------------------------------
def Update(self, inputBar):
effectiveBarValue = self.barCalc(inputBar) # round(float(statistics.median([Field.Open(inputBar), Field.High(inputBar), Field.Low(inputBar), Field.Close(inputBar)])), 4)# self.barCalc(inputBar)
self.rollingWindow.Add(effectiveBarValue)
if(not self.rollingWindow.IsReady):
return False
else:
basisValue = np.flipud(np.array([self.rollingWindow[i] for i in range(self.period)]))
self.kf = KalmanFilter( transition_matrices = self.transition_matrices,
observation_matrices = self.observation_matrices,
initial_state_mean = self.initial_state_mean,
initial_state_covariance = self.initial_state_covariance,
observation_covariance = self.observation_covariance,
transition_covariance = self.transition_covariance)
#self.kf = self.kf.em(basisValue, n_iter=5)
kf,_ = self.kf.filter(basisValue) # self.kf.smooth(basisValue)
currKalman = kf[-1]
self.Value = float(currKalman)
return True
################################################################################
#
# LaguerreFilterIndicator
# ==============================
# Laguerre Filter as defined by John F. Ehlers in `Cybernetic Analysis for
# Stock and Futures`, 2004, published by Wiley. `ISBN: 978-0-471-46307-8
# https://www.mt5users.com/wp-content/uploads/2020/01/timewarp.pdf
#
# Copied from @vladimir's implementation
# https://www.quantconnect.com/forum/discussion/11788/another-digital-filter-laguerre-filter/p1/comment-34897
#
################################################################################
class LaguerreFilterIndicator(PythonIndicator):
def __init__(self, name, gamma ):
self.Name = name
self.gamma = gamma
self.prices = np.array([])
self.Value = 0
self.L0 = 0.0; self.L1 = 0.0; self.L2 = 0.0; self.L3 = 0.0
def Update(self, input):
mp = (input.High + input.Low)/2
self.prices = np.append(self.prices, mp)[-4:]
if len(self.prices) <= 1:
self.L0 = mp; self.L1 = mp; self.L2 = mp; self.L3 = mp;
if len(self.prices) != 4 : return
L01 = self.L0; L11 = self.L1; L21 = self.L2; L31 = self.L3;
g = self.gamma
self.L0 = (1 - g)*mp + g*L01
self.L1 = L01 - g*self.L0 + g*L11
self.L2 = L11 - g*self.L1 + g*L21
self.L3 = L21 - g*self.L2 + g*L31
if len(self.prices) != 4 :
self.Value = mp
return False
self.Value = (self.L0 + (2*self.L1) + 2*(self.L2) + self.L3) / 6
return True
#region imports
from AlgorithmImports import *
#endregion
###################################################
#
# Smart Rolling window
# ========================
# Convenience object to build on RollingWindow functionality
#
# Methods:
# -------------------------
# mySmartWindow.IsRising()
# mySmartWindow.IsFalling()
# mySmartWindow.crossedAboveValue(value)
# mySmartWindow.crossedBelowValue(value)
# mySmartWindow.crossedAbove(otherWindow)
# mySmartWindow.crossedBelow(otherWindow)
# mySmartWindow.IsFlat(decimalPrecision)
# mySmartWindow.hasAtLeastThisMany(value)
#
###################################################
class SmartRollingWindow():
def __init__(self, windowType, windowLength):
self.window = None
self.winLength = windowLength
if (windowType is "int"):self.window = RollingWindow[int](windowLength)
elif (windowType is "bool"):self.window = RollingWindow[bool](windowLength)
elif (windowType is "float"):self.window = RollingWindow[float](windowLength)
elif (windowType is "TradeBar"):self.window = RollingWindow[TradeBar](windowLength)
def crossedAboveValue(self, value): return (self.window[1] <= value < self.window[0])
def crossedBelowValue(self, value): return (self.window[1] >= value > self.window[0])
def crossedAbove(self, series): return (any(self.window[i+1] <= series[i+1] and self.window[i] > series[i] for i in range(0, self.winLength-1)))
def crossedBelow(self, series): return (any(self.window[i+1] >= series[i+1] and self.window[i] < series[i] for i in range(0, self.winLength-1)))
def isAbove(self, series): return (self.window[0] > series[0])
def isBelow(self, series): return (self.window[0] < series[0])
def isFlat(self): return (self.window[1] == self.window[0])
def isFalling(self): return (self.window[1] > self.window[0])
def isRising(self): return (self.window[1] < self.window[0])
def Add(self,value):
self.window.Add(value)
def IsReady(self):
return (self.window is not None) and \
(self.window.Count >= self.winLength) ## TODO: just use rw.IsReady?
def __getitem__(self, index):
return self.window[index]